diff --git a/Cargo.lock b/Cargo.lock index 66b37cf4..fce47a31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2658,9 +2658,9 @@ dependencies = [ [[package]] name = "json-event-parser" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f850fafca79ebacd70eab9d80cb75a33aeda38bde8f3dd784c1837cdf0bde631" +checksum = "73267b6bffa5356bd46cfa89386673e9a7f62f4eb3adcb45b1bd031892357853" [[package]] name = "kernel32-sys" @@ -3129,6 +3129,18 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4ed3a7192fa19f5f48f99871f2755047fabefd7f222f12a1df1773796a102" +[[package]] +name = "oxjsonld" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13a1a66dc569350f3f4e5eff8a8e1a72b0c9e6ad395bb5805493cb7a2fda185f" +dependencies = [ + "json-event-parser", + "oxiri", + "oxrdf", + "thiserror 2.0.9", +] + [[package]] name = "oxrdf" version = "0.2.4" @@ -3137,17 +3149,17 @@ checksum = "a04761319ef84de1f59782f189d072cbfc3a9a40c4e8bded8667202fbd35b02a" dependencies = [ "oxilangtag", "oxiri", - "oxsdatatypes", "rand 0.8.5", "thiserror 2.0.9", ] [[package]] name = "oxrdfio" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47d45d81e8e5313e13f88da6c040f5edcd5aa48e91d1cdf0972f5bc79362890d" +checksum = "14d33dd87769786a0bb7de342865e33bf0c6e9872fa76f1ede23e944fdc77898" dependencies = [ + "oxjsonld", "oxrdf", "oxrdfxml", "oxttl", @@ -3156,9 +3168,9 @@ dependencies = [ [[package]] name = "oxrdfxml" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e00a36021b71d74281cf9ffbbda57210bb9cb84b7f28bda50a3ee9641b6ce5d" +checksum = "d8d4bf9c5331127f01efbd1245d90fd75b7c546a97cb3e95461121ce1ad5b1c8" dependencies = [ "oxilangtag", "oxiri", @@ -3178,9 +3190,9 @@ dependencies = [ [[package]] name = "oxttl" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bc2f05b9179f1033582fe3ae58a96cb4311dd1a2116791c3236117d5427f275" +checksum = "0d385f1776d7cace455ef6b7c54407838eff902ca897303d06eb12a26f4cf8a0" dependencies = [ "memchr", "oxilangtag", @@ -3736,7 +3748,6 @@ dependencies = [ "rdf-fusion-encoding", "rdf-fusion-functions", "rdf-fusion-model", - "spargebra", ] [[package]] @@ -3746,6 +3757,7 @@ dependencies = [ "oxiri", "oxrdf", "oxsdatatypes", + "spargebra", "thiserror 2.0.9", ] @@ -4223,9 +4235,9 @@ dependencies = [ [[package]] name = "sparesults" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b845f8f37245959132c0561bb737c28eb9c0b693b92535fc5c3965a9390724d" +checksum = "f478f5ead16b6136bccee7a52ea43a615f8512086708f515e26ce33e0b184036" dependencies = [ "json-event-parser", "memchr", diff --git a/Cargo.toml b/Cargo.toml index c24e2bf3..5e96b1d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,9 +58,12 @@ uuid = "1.14.0" zip-extract = "0.3.0" # Upstream Oxigraph dependencies -oxttl = "0.1.7" -oxrdfio = "0.1.7" -sparesults = "0.2.4" +oxttl = "0.1.8" +oxiri = "0.2.11" +oxrdf = "0.2.4" +oxrdfio = "0.1.8" +oxsdatatypes = "0.2.2" +sparesults = "0.2.5" spargebra = "0.3.5" # Internal dependencies diff --git a/lib/common/src/blank_node_mode.rs b/lib/common/src/blank_node_mode.rs new file mode 100644 index 00000000..1fbd5e09 --- /dev/null +++ b/lib/common/src/blank_node_mode.rs @@ -0,0 +1,9 @@ +/// TODO +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +pub enum BlankNodeMatchingMode { + /// A blank node is interpreted as a variable. + #[default] + Variable, + /// A blank node is interpreted as a constant filter. + Filter, +} diff --git a/lib/common/src/lib.rs b/lib/common/src/lib.rs index 8d38ad02..f00e4f09 100644 --- a/lib/common/src/lib.rs +++ b/lib/common/src/lib.rs @@ -1,8 +1,10 @@ extern crate core; +mod blank_node_mode; pub mod error; mod quad_storage; +pub use blank_node_mode::BlankNodeMatchingMode; pub use quad_storage::QuadPatternEvaluator; pub use quad_storage::QuadStorage; diff --git a/lib/common/src/quad_storage.rs b/lib/common/src/quad_storage.rs index c01ebc59..8737115a 100644 --- a/lib/common/src/quad_storage.rs +++ b/lib/common/src/quad_storage.rs @@ -1,12 +1,12 @@ use crate::error::StorageError; -use crate::DFResult; +use crate::{BlankNodeMatchingMode, DFResult}; use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_planner::ExtensionPlanner; use rdf_fusion_model::{ - GraphNameRef, NamedNodeRef, NamedOrBlankNode, NamedOrBlankNodeRef, Quad, QuadRef, SubjectRef, - TermRef, + GraphName, GraphNameRef, NamedOrBlankNode, NamedOrBlankNodeRef, Quad, QuadRef, TriplePattern, + Variable, }; use std::fmt::Debug; use std::sync::Arc; @@ -72,17 +72,14 @@ pub trait QuadStorage: Send + Sync { pub trait QuadPatternEvaluator: Debug + Send + Sync { /// Returns a stream of quads that match the given pattern. /// - /// The resulting stream must have a schema that is compatible with the default schema for - /// quads. Each emitted batch should have `batch_size` elements. - /// - /// While currently we can only filter for constant patterns, in the future this method - /// should be able to evaluate arbitrary patterns (i.e., including variables). - fn quads_for_pattern( + /// The resulting stream must have a schema that projects to the variables provided in the + /// arguments. Each emitted batch should have `batch_size` elements. + fn evaluate_pattern( &self, - graph: GraphNameRef<'_>, - subject: Option>, - predicate: Option>, - object: Option>, + graph: GraphName, + graph_variable: Option, + pattern: TriplePattern, + blank_node_mode: BlankNodeMatchingMode, batch_size: usize, ) -> DFResult; } diff --git a/lib/engine/src/engine.rs b/lib/engine/src/engine.rs index 94b46446..ea5ff2a7 100644 --- a/lib/engine/src/engine.rs +++ b/lib/engine/src/engine.rs @@ -80,7 +80,7 @@ impl RdfFusionInstance { /// Checks whether `quad` is contained in the instance. pub async fn contains(&self, quad: &QuadRef<'_>) -> DFResult { let active_graph_info = graph_name_to_active_graph(Some(quad.graph_name)); - let pattern_plan = RdfFusionLogicalPlanBuilder::new_from_quads( + let pattern_plan = RdfFusionLogicalPlanBuilder::new_from_matching_quads( Arc::clone(&self.functions), active_graph_info, Some(quad.subject.into_owned()), @@ -109,7 +109,7 @@ impl RdfFusionInstance { object: Option>, ) -> DFResult { let active_graph_info = graph_name_to_active_graph(graph_name); - let pattern_plan = RdfFusionLogicalPlanBuilder::new_from_quads( + let pattern_plan = RdfFusionLogicalPlanBuilder::new_from_matching_quads( Arc::clone(&self.functions), active_graph_info, subject.map(SubjectRef::into_owned), diff --git a/lib/engine/src/sparql/rewriting/graph_pattern_rewriter.rs b/lib/engine/src/sparql/rewriting/graph_pattern_rewriter.rs index cbc93d43..bb9f4adc 100644 --- a/lib/engine/src/sparql/rewriting/graph_pattern_rewriter.rs +++ b/lib/engine/src/sparql/rewriting/graph_pattern_rewriter.rs @@ -186,14 +186,14 @@ impl GraphPatternRewriter { object, } => { let state = self.state.borrow(); - RdfFusionLogicalPlanBuilder::new_from_property_path( + Ok(RdfFusionLogicalPlanBuilder::new_from_property_path( Arc::clone(&self.registry), state.active_graph.clone(), state.graph_name_var.clone(), path.clone(), subject.clone(), object.clone(), - ) + )) } GraphPattern::Minus { left, right } => { let left = self.rewrite_graph_pattern(left)?; diff --git a/lib/logical/Cargo.toml b/lib/logical/Cargo.toml index 495b650e..00e856e6 100644 --- a/lib/logical/Cargo.toml +++ b/lib/logical/Cargo.toml @@ -12,7 +12,6 @@ rdf-fusion-encoding.workspace = true rdf-fusion-functions.workspace = true datafusion.workspace = true rdf-fusion-model.workspace = true -spargebra.workspace = true [lints] workspace = true diff --git a/lib/logical/src/lib.rs b/lib/logical/src/lib.rs index 9f56b20a..c4ff4a13 100644 --- a/lib/logical/src/lib.rs +++ b/lib/logical/src/lib.rs @@ -9,7 +9,7 @@ mod logical_plan_builder; pub mod minus; pub mod paths; pub mod patterns; -pub mod quads; +pub mod quad_pattern; pub use active_graph::{ActiveGraph, EnumeratedActiveGraph}; use datafusion::common::{plan_err, DFSchema}; diff --git a/lib/logical/src/logical_plan_builder.rs b/lib/logical/src/logical_plan_builder.rs index 854a1bb2..f6c5c287 100644 --- a/lib/logical/src/logical_plan_builder.rs +++ b/lib/logical/src/logical_plan_builder.rs @@ -3,11 +3,12 @@ use crate::extend::ExtendNode; use crate::join::{SparqlJoinNode, SparqlJoinType}; use crate::minus::MinusNode; use crate::paths::PropertyPathNode; -use crate::patterns::PatternNode; -use crate::quads::QuadsNode; +use crate::quad_pattern::QuadPatternNode; use crate::{RdfFusionExprBuilder, RdfFusionExprBuilderRoot}; use datafusion::arrow::datatypes::{DataType, Field, Fields}; -use datafusion::common::{Column, DFSchema, DFSchemaRef}; +use datafusion::common::{Column, DFSchema, DFSchemaRef, DataFusionError}; +use datafusion::logical_expr::builder::project; +use datafusion::logical_expr::select_expr::SelectExpr; use datafusion::logical_expr::{ col, lit, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, SortExpr, UserDefinedLogicalNode, Values, @@ -15,11 +16,16 @@ use datafusion::logical_expr::{ use rdf_fusion_common::DFResult; use rdf_fusion_encoding::plain_term::encoders::DefaultPlainTermEncoder; use rdf_fusion_encoding::plain_term::PlainTermEncoding; -use rdf_fusion_encoding::{EncodingName, EncodingScalar, TermEncoder, TermEncoding}; +use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_DFSCHEMA; +use rdf_fusion_encoding::{ + EncodingName, EncodingScalar, TermEncoder, TermEncoding, COL_GRAPH, COL_OBJECT, COL_PREDICATE, + COL_SUBJECT, +}; use rdf_fusion_functions::registry::RdfFusionFunctionRegistryRef; -use rdf_fusion_model::{NamedNode, Subject, Term, TermRef, ThinError, Variable}; -use spargebra::algebra::PropertyPathExpression; -use spargebra::term::{GroundTerm, NamedNodePattern, TermPattern, TriplePattern}; +use rdf_fusion_model::{ + GroundTerm, NamedNode, NamedNodePattern, PropertyPathExpression, Subject, Term, TermPattern, + TermRef, ThinError, TriplePattern, Variable, +}; use std::collections::HashMap; use std::sync::Arc; @@ -51,24 +57,96 @@ impl RdfFusionLogicalPlanBuilder { /// Creates a new [RdfFusionLogicalPlanBuilder] that matches Quads. /// /// The `active_graph` dictates which graphs should be considered, while the optional constants - /// (`subject`, `predicate`, `object`) allows filtering the resulting solution sequence. + /// (`subject`, `predicate`, `object`) allow filtering the resulting solution sequence. /// - /// This does not allow you to bind values to custom variable. See [Self::new_from_pattern] for + /// This does not allow you to bind values to variables. See [Self::new_from_pattern] for /// this purpose. - pub fn new_from_quads( + #[allow(clippy::expect_used, reason = "Indicates programming error")] + pub fn new_from_matching_quads( registry: RdfFusionFunctionRegistryRef, active_graph: ActiveGraph, subject: Option, predicate: Option, object: Option, ) -> Self { - let node = QuadsNode::new(active_graph, subject, predicate, object); + let partial_quads = Self::create_pattern_node_from_constants( + active_graph, + subject.clone(), + predicate.clone(), + object.clone(), + ); + let filled_quads = + Self::fill_quads_with_constants(partial_quads, subject, predicate, object) + .expect("Variables are fixed, Terms are encodable"); + + assert_eq!( + filled_quads.schema().as_ref(), + DEFAULT_QUAD_DFSCHEMA.as_ref(), + "Unexpected schema for matching quads." + ); + Self { - plan_builder: create_extension_plan(node), + plan_builder: LogicalPlanBuilder::new(filled_quads), registry, } } + /// Creates a pattern node for the constant values provided. + /// + /// If a constant is `None`, the default name of the column (e.g., `?subject`) is used for the + /// pattern. + fn create_pattern_node_from_constants( + active_graph: ActiveGraph, + subject: Option, + predicate: Option, + object: Option, + ) -> QuadPatternNode { + let triple_pattern = TriplePattern { + subject: subject.map_or( + TermPattern::Variable(Variable::new_unchecked(COL_SUBJECT)), + |s| TermPattern::from(Term::from(s)), + ), + predicate: predicate.map_or( + NamedNodePattern::Variable(Variable::new_unchecked(COL_PREDICATE)), + NamedNodePattern::from, + ), + object: object.map_or( + TermPattern::Variable(Variable::new_unchecked(COL_OBJECT)), + TermPattern::from, + ), + }; + + QuadPatternNode::new_with_blank_nodes_as_filter( + active_graph, + Some(Variable::new_unchecked(COL_GRAPH)), + triple_pattern, + ) + } + + /// Fills missing columns in the quads with the constants. + fn fill_quads_with_constants( + inner: QuadPatternNode, + subject: Option, + predicate: Option, + object: Option, + ) -> DFResult { + let graph = col(COL_GRAPH); + let subject = column_or_literal(subject, COL_SUBJECT)?; + let predicate = column_or_literal(predicate, COL_PREDICATE)?; + let object = column_or_literal(object, COL_OBJECT)?; + + let inner = LogicalPlan::Extension(Extension { + node: Arc::new(inner), + }); + + project( + inner, + [graph, subject, predicate, object] + .into_iter() + .map(SelectExpr::from), + ) + } + /// Creates a new [RdfFusionLogicalPlanBuilder] that that returns a single empty solution. pub fn new_with_empty_solution(registry: RdfFusionFunctionRegistryRef) -> Self { let plan_builder = LogicalPlanBuilder::empty(true); @@ -157,6 +235,7 @@ impl RdfFusionLogicalPlanBuilder { p.clone(), ) }) + .map(Ok) .reduce(|lhs, rhs| lhs?.join(rhs?.build()?, SparqlJoinType::Inner, None)) .unwrap_or_else(|| { Ok(RdfFusionLogicalPlanBuilder::new_with_empty_solution( @@ -182,36 +261,14 @@ impl RdfFusionLogicalPlanBuilder { pub fn new_from_pattern( registry: RdfFusionFunctionRegistryRef, active_graph: ActiveGraph, - graph_variables: Option, + graph_variable: Option, pattern: TriplePattern, - ) -> DFResult { - /// Constant patterns are already pushed into the quads node. Therefore, we can ignore them - /// here. - fn eliminate_constant_pattern(pattern: impl Into) -> Option { - match pattern.into() { - TermPattern::BlankNode(bnode) => Some(TermPattern::BlankNode(bnode)), - TermPattern::Variable(var) => Some(TermPattern::Variable(var)), - _ => None, - } - } - - let quads = construct_quads_node_for_pattern(active_graph, pattern.clone()); - let quads_plan = create_extension_plan(quads).build()?; - - let pattern = PatternNode::try_new( - quads_plan, - vec![ - graph_variables.map(TermPattern::Variable), - eliminate_constant_pattern(pattern.subject), - eliminate_constant_pattern(pattern.predicate), - eliminate_constant_pattern(pattern.object), - ], - )?; - - Ok(Self { - plan_builder: create_extension_plan(pattern), + ) -> Self { + let quads = QuadPatternNode::new(active_graph, graph_variable, pattern); + Self { + plan_builder: create_extension_plan(quads), registry, - }) + } } /// TODO @@ -222,12 +279,12 @@ impl RdfFusionLogicalPlanBuilder { path: PropertyPathExpression, subject: TermPattern, object: TermPattern, - ) -> DFResult { - let node = PropertyPathNode::new(active_graph, graph_variable, subject, path, object)?; - Ok(Self { + ) -> RdfFusionLogicalPlanBuilder { + let node = PropertyPathNode::new(active_graph, graph_variable, subject, path, object); + Self { registry, plan_builder: create_extension_plan(node), - }) + } } /// TODO @@ -463,28 +520,6 @@ impl RdfFusionLogicalPlanBuilder { } } -/// Creates a [QuadsNode] for the given pattern. -fn construct_quads_node_for_pattern( - active_graph: ActiveGraph, - pattern: TriplePattern, -) -> QuadsNode { - let subject = match pattern.subject { - TermPattern::NamedNode(nn) => Some(nn.into()), - _ => None, - }; - let predicate = match pattern.predicate { - NamedNodePattern::NamedNode(nn) => Some(nn), - NamedNodePattern::Variable(_) => None, - }; - let object = match pattern.object { - TermPattern::NamedNode(nn) => Some(nn.into()), - TermPattern::Literal(lit) => Some(lit.into()), - _ => None, - }; - - QuadsNode::new(active_graph, subject, predicate, object) -} - /// TODO fn create_distinct_on_expressions( expr_builder_root: RdfFusionExprBuilderRoot<'_>, @@ -518,3 +553,15 @@ fn create_extension_plan(node: impl UserDefinedLogicalNode + 'static) -> Logical node: Arc::new(node), })) } + +fn column_or_literal(term: Option>, col_name: &str) -> DFResult { + Ok(term + .map(|s| { + Ok::( + lit(PlainTermEncoding::encode_scalar(s.into().as_ref())?.into_scalar_value()) + .alias(col_name), + ) + }) + .transpose()? + .unwrap_or(col(col_name))) +} diff --git a/lib/logical/src/paths/path_node/logical.rs b/lib/logical/src/paths/path_node/logical.rs index 0e32b310..92dc8008 100644 --- a/lib/logical/src/paths/path_node/logical.rs +++ b/lib/logical/src/paths/path_node/logical.rs @@ -3,9 +3,8 @@ use crate::patterns::compute_schema_for_pattern; use crate::ActiveGraph; use datafusion::common::{plan_err, DFSchemaRef}; use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use rdf_fusion_common::DFResult; -use spargebra::algebra::PropertyPathExpression; -use spargebra::term::{TermPattern, Variable}; +use rdf_fusion_common::{BlankNodeMatchingMode, DFResult}; +use rdf_fusion_model::{PropertyPathExpression, TermPattern, Variable}; use std::cmp::Ordering; use std::fmt; @@ -26,16 +25,16 @@ impl PropertyPathNode { subject: TermPattern, path: PropertyPathExpression, object: TermPattern, - ) -> DFResult { - let schema = compute_schema(graph_name_var.as_ref(), &subject, &object)?; - Ok(Self { + ) -> Self { + let schema = compute_schema(graph_name_var.as_ref(), &subject, &object); + Self { active_graph, graph_name_var, subject, path, object, schema, - }) + } } pub fn active_graph(&self) -> &ActiveGraph { @@ -108,13 +107,13 @@ impl UserDefinedLogicalNodeCore for PropertyPathNode { if !exprs.is_empty() { return plan_err!("Expected 0 expressions but got {}", exprs.len()); } - Self::new( + Ok(Self::new( self.active_graph.clone(), self.graph_name_var.clone(), self.subject.clone(), self.path.clone(), self.object.clone(), - ) + )) } } @@ -122,11 +121,15 @@ fn compute_schema( graph: Option<&Variable>, subject: &TermPattern, object: &TermPattern, -) -> DFResult { +) -> DFSchemaRef { let patterns = vec![ graph.map(|v| TermPattern::Variable(v.clone())), Some(subject.clone()), Some(object.clone()), ]; - compute_schema_for_pattern(&PATH_TABLE_DFSCHEMA, &patterns) + compute_schema_for_pattern( + &PATH_TABLE_DFSCHEMA, + &patterns, + BlankNodeMatchingMode::Variable, + ) } diff --git a/lib/logical/src/paths/path_node/rewrite.rs b/lib/logical/src/paths/path_node/rewrite.rs index 39272458..684187c7 100644 --- a/lib/logical/src/paths/path_node/rewrite.rs +++ b/lib/logical/src/paths/path_node/rewrite.rs @@ -15,9 +15,10 @@ use rdf_fusion_common::DFResult; use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_DFSCHEMA; use rdf_fusion_encoding::{COL_GRAPH, COL_OBJECT, COL_PREDICATE, COL_SUBJECT}; use rdf_fusion_functions::registry::RdfFusionFunctionRegistryRef; -use rdf_fusion_model::{NamedNode, TermRef}; -use spargebra::algebra::PropertyPathExpression; -use spargebra::term::{NamedNodePattern, TermPattern, TriplePattern, Variable}; +use rdf_fusion_model::{ + NamedNode, NamedNodePattern, PropertyPathExpression, TermPattern, TermRef, TriplePattern, + Variable, +}; use std::sync::Arc; #[derive(Debug)] @@ -286,7 +287,7 @@ impl PropertyPathLoweringRule { active_graph.clone(), Some(Variable::new_unchecked(COL_GRAPH)), pattern, - )?; + ); // Apply filter if present let builder = if let Some(filter) = filter { diff --git a/lib/logical/src/patterns/logical.rs b/lib/logical/src/patterns/logical.rs index edf36189..253b2501 100644 --- a/lib/logical/src/patterns/logical.rs +++ b/lib/logical/src/patterns/logical.rs @@ -1,8 +1,8 @@ use crate::patterns::compute_schema_for_pattern; use datafusion::common::{plan_err, DFSchemaRef}; use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use rdf_fusion_common::DFResult; -use spargebra::term::TermPattern; +use rdf_fusion_common::{BlankNodeMatchingMode, DFResult}; +use rdf_fusion_model::TermPattern; use std::cmp::Ordering; use std::fmt; @@ -31,7 +31,8 @@ impl PatternNode { // TODO: Check type - let schema = compute_schema_for_pattern(input.schema(), &patterns)?; + let schema = + compute_schema_for_pattern(input.schema(), &patterns, BlankNodeMatchingMode::Variable); Ok(Self { input, patterns, diff --git a/lib/logical/src/patterns/mod.rs b/lib/logical/src/patterns/mod.rs index b3556d92..557adb4b 100644 --- a/lib/logical/src/patterns/mod.rs +++ b/lib/logical/src/patterns/mod.rs @@ -4,17 +4,40 @@ mod rewrite; use datafusion::arrow::datatypes::{Field, Fields}; use datafusion::common::{DFSchema, DFSchemaRef}; pub use logical::*; -use rdf_fusion_common::DFResult; +use rdf_fusion_common::BlankNodeMatchingMode; +use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_DFSCHEMA; +use rdf_fusion_model::{TermPattern, TriplePattern, VariableRef}; pub use rewrite::*; -use spargebra::term::TermPattern; use std::collections::{HashMap, HashSet}; use std::sync::Arc; /// TODO +pub fn compute_schema_for_triple_pattern( + graph_variable: Option>, + pattern: &TriplePattern, + blank_node_mode: BlankNodeMatchingMode, +) -> DFSchemaRef { + compute_schema_for_pattern( + &DEFAULT_QUAD_DFSCHEMA, + &vec![ + graph_variable + .as_ref() + .map(|v| TermPattern::Variable(v.into_owned())), + Some(pattern.subject.clone()), + Some(pattern.predicate.clone().into()), + Some(pattern.object.clone()), + ], + blank_node_mode, + ) +} + +/// TODO +#[allow(clippy::expect_used, reason = "Variables should not clash")] pub fn compute_schema_for_pattern( inner_schema: &DFSchema, patterns: &[Option], -) -> DFResult { + blank_node_mode: BlankNodeMatchingMode, +) -> DFSchemaRef { let mut seen: HashSet<&str> = HashSet::new(); let mut fields: Vec<(&str, &Field)> = Vec::new(); @@ -26,7 +49,10 @@ pub fn compute_schema_for_pattern( fields.push((variable.as_str(), field)); } } - Some(TermPattern::BlankNode(bnode)) => { + // A blank node only leads to an output variable if it is matched like a variable + Some(TermPattern::BlankNode(bnode)) + if blank_node_mode == BlankNodeMatchingMode::Variable => + { if !seen.contains(bnode.as_str()) { seen.insert(bnode.as_str()); fields.push((bnode.as_str(), field)); @@ -40,8 +66,8 @@ pub fn compute_schema_for_pattern( .into_iter() .map(|(name, field)| Field::new(name, field.data_type().clone(), field.is_nullable())) .collect::(); - Ok(Arc::new(DFSchema::from_unqualified_fields( - fields, - HashMap::new(), - )?)) + Arc::new( + DFSchema::from_unqualified_fields(fields, HashMap::new()) + .expect("Fields already deduplicated."), + ) } diff --git a/lib/logical/src/patterns/rewrite.rs b/lib/logical/src/patterns/rewrite.rs index 20a243ac..f6848675 100644 --- a/lib/logical/src/patterns/rewrite.rs +++ b/lib/logical/src/patterns/rewrite.rs @@ -9,7 +9,7 @@ use datafusion::optimizer::{OptimizerConfig, OptimizerRule}; use datafusion::prelude::Expr; use rdf_fusion_common::DFResult; use rdf_fusion_functions::registry::{RdfFusionFunctionRegistry, RdfFusionFunctionRegistryRef}; -use spargebra::term::{Term, TermPattern}; +use rdf_fusion_model::{Term, TermPattern}; use std::collections::{HashMap, HashSet}; #[derive(Debug)] @@ -112,6 +112,7 @@ fn filter_same_variable( .into_iter() .zip(pattern.iter()); for (column, pattern) in column_patterns { + // TODO: Support blank nodes? if let Some(TermPattern::Variable(variable)) = pattern { if !mappings.contains_key(variable) { mappings.insert(variable.clone(), Vec::new()); diff --git a/lib/logical/src/quad_pattern/logical.rs b/lib/logical/src/quad_pattern/logical.rs new file mode 100644 index 00000000..caad1a8a --- /dev/null +++ b/lib/logical/src/quad_pattern/logical.rs @@ -0,0 +1,183 @@ +use crate::active_graph::ActiveGraph; +use crate::patterns::compute_schema_for_triple_pattern; +use datafusion::common::{plan_err, DFSchemaRef}; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use rdf_fusion_common::{BlankNodeMatchingMode, DFResult}; +use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_DFSCHEMA; +use rdf_fusion_encoding::{COL_GRAPH, COL_OBJECT, COL_PREDICATE, COL_SUBJECT}; +use rdf_fusion_model::{NamedNodePattern, TermPattern, TriplePattern, Variable, VariableRef}; +use std::cmp::Ordering; +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +/// TODO +/// +/// ### Planning [QuadPatternNode] +/// +/// Planning the [QuadPatternNode] requires users to define a specialized planner for the used +/// storage layer. This is because the planner should consider storage-specific problems like +/// sharing a transaction across multiple scans of the quads table in a single query. The built-in +/// storage layers of RdfFusion provide examples. +#[derive(PartialEq, Eq, Hash)] +pub struct QuadPatternNode { + /// The active graph to query. + active_graph: ActiveGraph, + /// Whether to project the graph to a variable. + graph_variable: Option, + /// The triple pattern to match. + pattern: TriplePattern, + /// How to handle blank nodes in the pattern. + blank_node_mode: BlankNodeMatchingMode, + /// The schema of the result. + schema: DFSchemaRef, +} + +impl QuadPatternNode { + /// Creates a new [QuadPatternNode]. + pub fn new( + active_graph: ActiveGraph, + graph_variable: Option, + pattern: TriplePattern, + ) -> Self { + let schema = compute_schema_for_triple_pattern( + graph_variable.as_ref().map(|v| v.as_ref()), + &pattern, + BlankNodeMatchingMode::Variable, + ); + Self { + active_graph, + graph_variable, + blank_node_mode: BlankNodeMatchingMode::Variable, + pattern, + schema, + } + } + + /// Creates a new [QuadPatternNode]. + /// + /// Contrary to [Self::new], blank nodes are not treated as a variable. They are used for + /// filtering the quad set. + pub fn new_with_blank_nodes_as_filter( + active_graph: ActiveGraph, + graph_variable: Option, + pattern: TriplePattern, + ) -> Self { + let schema = compute_schema_for_triple_pattern( + graph_variable.as_ref().map(|v| v.as_ref()), + &pattern, + BlankNodeMatchingMode::Filter, + ); + Self { + active_graph, + graph_variable, + blank_node_mode: BlankNodeMatchingMode::Filter, + pattern, + schema, + } + } + + /// Creates a new [QuadPatternNode] that returns all quads in `active_graph` using the default + /// quads schema. + pub fn new_all_quads(active_graph: ActiveGraph) -> Self { + Self { + active_graph, + graph_variable: Some(Variable::new_unchecked(COL_GRAPH)), + pattern: TriplePattern { + subject: TermPattern::Variable(Variable::new_unchecked(COL_SUBJECT)), + predicate: NamedNodePattern::Variable(Variable::new_unchecked(COL_PREDICATE)), + object: TermPattern::Variable(Variable::new_unchecked(COL_OBJECT)), + }, + blank_node_mode: BlankNodeMatchingMode::Filter, // Doesn't matter here + schema: Arc::clone(&DEFAULT_QUAD_DFSCHEMA), + } + } + + /// The active graph to query. + pub fn active_graph(&self) -> &ActiveGraph { + &self.active_graph + } + + /// The result mode of the [QuadPatternNode]. + pub fn graph_variable(&self) -> Option> { + self.graph_variable.as_ref().map(|v| v.as_ref()) + } + + /// The triple pattern to match. + pub fn pattern(&self) -> &TriplePattern { + &self.pattern + } + + /// The blank node matching mode. + pub fn blank_node_mode(&self) -> BlankNodeMatchingMode { + self.blank_node_mode + } +} + +impl fmt::Debug for QuadPatternNode { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + UserDefinedLogicalNodeCore::fmt_for_explain(self, f) + } +} + +impl PartialOrd for QuadPatternNode { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + +impl UserDefinedLogicalNodeCore for QuadPatternNode { + fn name(&self) -> &str { + "QuadPattern" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "QuadPattern ")?; + + if self.active_graph != ActiveGraph::DefaultGraph { + write!(f, "active_graph: {} ", self.active_graph)?; + } + + write!(f, "(")?; + if let Some(graph_variable) = &self.graph_variable { + write!(f, "{graph_variable} ")?; + } + write!(f, "{})", &self.pattern) + } + + fn with_exprs_and_inputs(&self, exprs: Vec, inputs: Vec) -> DFResult { + if !inputs.is_empty() { + return plan_err!("QuadPatternNode has no inputs, got {}.", inputs.len()); + } + + if !exprs.is_empty() { + return plan_err!("QuadPatternNode has no expressions, got {}.", exprs.len()); + } + + let cloned = match self.blank_node_mode { + BlankNodeMatchingMode::Variable => Self::new( + self.active_graph.clone(), + self.graph_variable.clone(), + self.pattern.clone(), + ), + BlankNodeMatchingMode::Filter => Self::new_with_blank_nodes_as_filter( + self.active_graph.clone(), + self.graph_variable.clone(), + self.pattern.clone(), + ), + }; + Ok(cloned) + } +} diff --git a/lib/logical/src/quad_pattern/mod.rs b/lib/logical/src/quad_pattern/mod.rs new file mode 100644 index 00000000..d83fe35e --- /dev/null +++ b/lib/logical/src/quad_pattern/mod.rs @@ -0,0 +1,3 @@ +mod logical; + +pub use logical::*; diff --git a/lib/logical/src/quads/logical.rs b/lib/logical/src/quads/logical.rs deleted file mode 100644 index fd56074b..00000000 --- a/lib/logical/src/quads/logical.rs +++ /dev/null @@ -1,131 +0,0 @@ -use crate::active_graph::ActiveGraph; -use datafusion::common::{plan_err, DFSchemaRef}; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_DFSCHEMA; -use rdf_fusion_model::NamedNode; -use spargebra::term::{Subject, Term}; -use std::cmp::Ordering; -use std::fmt; -use std::fmt::Formatter; - -/// TODO -/// -/// ### Planning QuadsNode -/// -/// Planning the QuadsNode requires users to define a specialized planner for the used storage -/// layer. This is because the planner should consider storage-specific problems like sharing a -/// transaction across multiple scans of the quads table in a single query. The built-in storage -/// layers of RdfFusion provide examples. -#[derive(PartialEq, Eq, Hash)] -pub struct QuadsNode { - active_graph: ActiveGraph, - subject: Option, - predicate: Option, - object: Option, -} - -impl QuadsNode { - /// TODO - pub fn new( - active_graph: ActiveGraph, - subject: Option, - predicate: Option, - object: Option, - ) -> Self { - Self { - active_graph, - subject, - predicate, - object, - } - } - - /// TODO - pub fn active_graph(&self) -> &ActiveGraph { - &self.active_graph - } - - /// TODO - pub fn subject(&self) -> Option<&Subject> { - self.subject.as_ref() - } - - /// TODO - pub fn predicate(&self) -> Option<&NamedNode> { - self.predicate.as_ref() - } - - /// TODO - pub fn object(&self) -> Option<&Term> { - self.object.as_ref() - } -} - -impl fmt::Debug for QuadsNode { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - UserDefinedLogicalNodeCore::fmt_for_explain(self, f) - } -} - -impl PartialOrd for QuadsNode { - fn partial_cmp(&self, _other: &Self) -> Option { - None - } -} - -impl UserDefinedLogicalNodeCore for QuadsNode { - fn name(&self) -> &str { - "Quads" - } - - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - - fn schema(&self) -> &DFSchemaRef { - &DEFAULT_QUAD_DFSCHEMA - } - - fn expressions(&self) -> Vec { - vec![] - } - - fn fmt_for_explain(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "Quads")?; - - if let Some(subject) = &self.subject { - write!(f, " subject={subject}")?; - } - - if let Some(predicate) = &self.predicate { - write!(f, " predicate={predicate}")?; - } - - if let Some(object) = &self.object { - write!(f, " object={object}")?; - } - - Ok(()) - } - - fn with_exprs_and_inputs( - &self, - exprs: Vec, - inputs: Vec, - ) -> datafusion::common::Result { - if !inputs.is_empty() { - return plan_err!("QuadsNode has no inputs, got {}.", inputs.len()); - } - - if !exprs.is_empty() { - return plan_err!("QuadsNode has no expressions, got {}.", exprs.len()); - } - - Ok(Self::new( - self.active_graph.clone(), - self.subject.clone(), - self.predicate.clone(), - self.object.clone(), - )) - } -} diff --git a/lib/logical/src/quads/mod.rs b/lib/logical/src/quads/mod.rs deleted file mode 100644 index 1d79b5a9..00000000 --- a/lib/logical/src/quads/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod logical; - -pub use logical::QuadsNode; diff --git a/lib/model/Cargo.toml b/lib/model/Cargo.toml index fb39eca7..d320073d 100644 --- a/lib/model/Cargo.toml +++ b/lib/model/Cargo.toml @@ -9,9 +9,10 @@ rust-version.workspace = true [dependencies] # Dependencies to oxigraph's data model. We re-export them in this crate if we rely on it. To avoid any other crates # accidentally depending directly on oxiraph's data model, we do not include it in the workspace. -oxsdatatypes = "0.2.2" -oxiri = "0.2.11" -oxrdf = { version = "0.2.4", features = ["oxsdatatypes"] } +oxsdatatypes.workspace = true +oxiri.workspace = true +oxrdf.workspace = true +spargebra.workspace = true # Regular Dependencies thiserror.workspace = true diff --git a/lib/model/src/lib.rs b/lib/model/src/lib.rs index 5a207d57..ff22bc7a 100644 --- a/lib/model/src/lib.rs +++ b/lib/model/src/lib.rs @@ -17,3 +17,5 @@ pub use oxrdf::{ QuadRef, Subject, SubjectRef, Term, TermParseError, TermRef, Triple, TripleRef, Variable, VariableNameParseError, VariableRef, }; +pub use spargebra::algebra::PropertyPathExpression; +pub use spargebra::term::{GroundTerm, NamedNodePattern, TermPattern, TriplePattern}; diff --git a/lib/physical/src/lib.rs b/lib/physical/src/lib.rs index 8880ab13..45765b0e 100644 --- a/lib/physical/src/lib.rs +++ b/lib/physical/src/lib.rs @@ -1,4 +1,4 @@ extern crate core; pub mod paths; -pub mod quads; +pub mod quad_pattern; diff --git a/lib/physical/src/quad_pattern/mod.rs b/lib/physical/src/quad_pattern/mod.rs new file mode 100644 index 00000000..7a8432af --- /dev/null +++ b/lib/physical/src/quad_pattern/mod.rs @@ -0,0 +1,3 @@ +mod physical; + +pub use physical::QuadPatternExec; diff --git a/lib/physical/src/quads/physical.rs b/lib/physical/src/quad_pattern/physical.rs similarity index 58% rename from lib/physical/src/quads/physical.rs rename to lib/physical/src/quad_pattern/physical.rs index e305f8b9..75d3af36 100644 --- a/lib/physical/src/quads/physical.rs +++ b/lib/physical/src/quad_pattern/physical.rs @@ -1,12 +1,12 @@ -use datafusion::common::{internal_err, plan_err}; +use datafusion::common::{exec_err, internal_err, plan_err}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; -use rdf_fusion_common::{DFResult, QuadPatternEvaluator}; -use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_SCHEMA; +use rdf_fusion_common::{BlankNodeMatchingMode, DFResult, QuadPatternEvaluator}; +use rdf_fusion_logical::patterns::compute_schema_for_triple_pattern; use rdf_fusion_logical::EnumeratedActiveGraph; -use rdf_fusion_model::{NamedNode, Subject, Term}; +use rdf_fusion_model::{TriplePattern, Variable}; use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -15,29 +15,40 @@ use std::sync::Arc; /// /// Storage layers are expected to provide a custom planner that provides a custom quads_evaluator. #[derive(Debug, Clone)] -pub struct QuadsExec { +pub struct QuadPatternExec { /// The actual implementation of the storage layer. quads_evaluator: Arc, /// Contains a list of graph names. Each [GraphName] corresponds to a partition. active_graph: EnumeratedActiveGraph, + /// The graph variable. + graph_variable: Option, + /// The triple pattern to match. + triple_pattern: TriplePattern, + /// How to interpret blank nodes. + blank_node_mode: BlankNodeMatchingMode, /// The execution properties of this operator. plan_properties: PlanProperties, - subject: Option, - predicate: Option, - object: Option, } -impl QuadsExec { +impl QuadPatternExec { /// TODO pub fn new( quads_evaluator: Arc, active_graph: EnumeratedActiveGraph, - subject: Option, - predicate: Option, - object: Option, + graph_variable: Option, + triple_pattern: TriplePattern, + blank_node_mode: BlankNodeMatchingMode, ) -> Self { + let schema = Arc::clone( + compute_schema_for_triple_pattern( + graph_variable.as_ref().map(|v| v.as_ref()), + &triple_pattern, + blank_node_mode, + ) + .inner(), + ); let plan_properties = PlanProperties::new( - EquivalenceProperties::new(DEFAULT_QUAD_SCHEMA.clone()), + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(active_graph.0.len()), EmissionType::Incremental, Boundedness::Bounded, @@ -45,17 +56,17 @@ impl QuadsExec { Self { quads_evaluator, active_graph, + graph_variable, + triple_pattern, + blank_node_mode, plan_properties, - subject, - predicate, - object, } } } -impl ExecutionPlan for QuadsExec { +impl ExecutionPlan for QuadPatternExec { fn name(&self) -> &str { - "QuadsExec" + "QuadPatternExec" } fn as_any(&self) -> &dyn Any { @@ -75,7 +86,7 @@ impl ExecutionPlan for QuadsExec { children: Vec>, ) -> DFResult> { if !children.is_empty() { - return plan_err!("QuadsExec has no child, got {}", children.len()); + return plan_err!("QuadPatternExec has no child, got {}", children.len()); } Ok(Arc::new((*self).clone())) } @@ -93,36 +104,33 @@ impl ExecutionPlan for QuadsExec { ); } - self.quads_evaluator.quads_for_pattern( - self.active_graph.0[partition].as_ref(), - self.subject.as_ref().map(|s| s.as_ref()), - self.predicate.as_ref().map(|p| p.as_ref()), - self.object.as_ref().map(|o| o.as_ref()), + let result = self.quads_evaluator.evaluate_pattern( + self.active_graph.0[partition].clone(), + self.graph_variable.clone(), + self.triple_pattern.clone(), + self.blank_node_mode, context.session_config().batch_size(), - ) + )?; + if result.schema() != self.schema() { + return exec_err!("Unexpected schema for quad pattern stream."); + } + + Ok(result) } } -impl DisplayAs for QuadsExec { +impl DisplayAs for QuadPatternExec { fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "QuadsExec ({} Graphs)", + "QuadPatternExec ({} Graphs, ", self.plan_properties.partitioning.partition_count() )?; - if let Some(subject) = &self.subject { - write!(f, " subject={subject}")?; - } - - if let Some(predicate) = &self.predicate { - write!(f, " predicate={predicate}")?; - } - - if let Some(object) = &self.object { - write!(f, " object={object}")?; + if let Some(graph_variable) = &self.graph_variable { + write!(f, " {graph_variable}")?; } - Ok(()) + write!(f, " {}", &self.triple_pattern) } } diff --git a/lib/physical/src/quads/mod.rs b/lib/physical/src/quads/mod.rs deleted file mode 100644 index 2ff73bf3..00000000 --- a/lib/physical/src/quads/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod physical; - -pub use physical::QuadsExec; diff --git a/lib/rdf-fusion/src/store.rs b/lib/rdf-fusion/src/store.rs index 468d884b..f21458d3 100644 --- a/lib/rdf-fusion/src/store.rs +++ b/lib/rdf-fusion/src/store.rs @@ -271,13 +271,7 @@ impl Store { /// # }).unwrap(); /// ``` pub async fn stream(&self) -> Result { - let record_batch_stream = self - .engine - .quads_for_pattern(None, None, None, None) - .await - .map_err(QueryEvaluationError::from)?; - let solution_stream = QuerySolutionStream::new(QUAD_VARIABLES.clone(), record_batch_stream); - QuadStream::try_new(solution_stream).map_err(QueryEvaluationError::InternalError) + self.quads_for_pattern(None, None, None, None).await } /// Checks if this store contains a given quad. diff --git a/lib/storage/src/oxigraph_memory/planner.rs b/lib/storage/src/oxigraph_memory/planner.rs index ada9ab58..8e84d806 100644 --- a/lib/storage/src/oxigraph_memory/planner.rs +++ b/lib/storage/src/oxigraph_memory/planner.rs @@ -1,5 +1,5 @@ use crate::oxigraph_memory::encoded_term::EncodedTerm; -use crate::oxigraph_memory::quad_storage_stream::QuadIteratorBatchRecordStream; +use crate::oxigraph_memory::quad_storage_stream::QuadPatternBatchRecordStream; use crate::oxigraph_memory::store::MemoryStorageReader; use crate::MemoryQuadStorage; use async_trait::async_trait; @@ -8,16 +8,19 @@ use datafusion::error::Result as DFResult; use datafusion::execution::context::SessionState; use datafusion::execution::SendableRecordBatchStream; use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{EmptyRecordBatchStream, ExecutionPlan}; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use rdf_fusion_common::QuadPatternEvaluator; -use rdf_fusion_logical::quads::QuadsNode; +use rdf_fusion_common::{BlankNodeMatchingMode, QuadPatternEvaluator}; +use rdf_fusion_logical::patterns::compute_schema_for_triple_pattern; +use rdf_fusion_logical::quad_pattern::QuadPatternNode; use rdf_fusion_logical::{ActiveGraph, EnumeratedActiveGraph}; -use rdf_fusion_model::{GraphName, GraphNameRef, NamedNodeRef, SubjectRef, TermRef}; -use rdf_fusion_physical::quads::QuadsExec; +use rdf_fusion_model::{ + GraphName, NamedNodePattern, Term, TermPattern, TriplePattern, Variable, VariableRef, +}; +use rdf_fusion_physical::quad_pattern::QuadPatternExec; use std::sync::Arc; -/// Planner for [QuadsNode]. +/// Planner for [QuadPatternNode]. pub struct OxigraphMemoryQuadNodePlanner { /// The implementation of the quad pattern evaluator. snapshot: MemoryStorageReader, @@ -65,7 +68,7 @@ impl OxigraphMemoryQuadNodePlanner { #[async_trait] impl ExtensionPlanner for OxigraphMemoryQuadNodePlanner { - /// Converts a logical [QuadsNode] into its physical execution plan + /// Converts a logical [QuadPatternNode] into its physical execution plan async fn plan_extension( &self, _planner: &dyn PhysicalPlanner, @@ -74,15 +77,22 @@ impl ExtensionPlanner for OxigraphMemoryQuadNodePlanner { _physical_inputs: &[Arc], _session_state: &SessionState, ) -> DFResult>> { - if let Some(node) = node.as_any().downcast_ref::() { - let active_graph = self.enumerate_active_graph(node.active_graph())?; - let quads = Arc::new(QuadsExec::new( + if let Some(quad_pattern_node) = node.as_any().downcast_ref::() { + let active_graph = self.enumerate_active_graph(quad_pattern_node.active_graph())?; + let quads = Arc::new(QuadPatternExec::new( Arc::new(self.snapshot.clone()), active_graph, - node.subject().cloned(), - node.predicate().cloned(), - node.object().cloned(), + quad_pattern_node + .graph_variable() + .map(VariableRef::into_owned), + quad_pattern_node.pattern().clone(), + quad_pattern_node.blank_node_mode(), )); + + if node.schema().inner().as_ref() != quads.schema().as_ref() { + return plan_err!("Schema does not match after planning QuadPatternExec."); + } + Ok(Some(quads)) } else { Ok(None) @@ -92,22 +102,65 @@ impl ExtensionPlanner for OxigraphMemoryQuadNodePlanner { #[async_trait] impl QuadPatternEvaluator for MemoryStorageReader { - fn quads_for_pattern( + fn evaluate_pattern( &self, - graph: GraphNameRef<'_>, - subject: Option>, - predicate: Option>, - object: Option>, + graph: GraphName, + graph_variable: Option, + pattern: TriplePattern, + blank_node_mode: BlankNodeMatchingMode, batch_size: usize, ) -> rdf_fusion_common::DFResult { + let subject = match &pattern.subject { + TermPattern::NamedNode(nn) => Some(Term::NamedNode(nn.clone())), + TermPattern::BlankNode(bnode) if blank_node_mode == BlankNodeMatchingMode::Filter => { + Some(Term::BlankNode(bnode.clone())) + } + TermPattern::Literal(_) => { + // If the subject is a literal, then the result is always empty. + let schema = compute_schema_for_triple_pattern( + graph_variable.as_ref().map(|v| v.as_ref()), + &pattern, + blank_node_mode, + ); + return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + schema.inner(), + )))); + } + _ => None, + }; + let predicate = match &pattern.predicate { + NamedNodePattern::NamedNode(nn) => Some(Term::NamedNode(nn.clone())), + NamedNodePattern::Variable(_) => None, + }; + let object = match &pattern.object { + TermPattern::NamedNode(nn) => Some(Term::NamedNode(nn.clone())), + TermPattern::BlankNode(bnode) if blank_node_mode == BlankNodeMatchingMode::Filter => { + Some(Term::BlankNode(bnode.clone())) + } + TermPattern::Literal(lit) => Some(Term::Literal(lit.clone())), + _ => None, + }; + let iterator = self.quads_for_pattern( - subject.map(EncodedTerm::from).as_ref(), - predicate.map(EncodedTerm::from).as_ref(), - object.map(EncodedTerm::from).as_ref(), - Some(&EncodedTerm::from(graph)), + Some(&EncodedTerm::from(graph.as_ref())), + subject.as_ref().map(|t| t.as_ref().into()).as_ref(), + predicate + .as_ref() + .map(|t| t.as_ref()) + .map(EncodedTerm::from) + .as_ref(), + object + .as_ref() + .map(|t| t.as_ref()) + .map(EncodedTerm::from) + .as_ref(), ); - Ok(Box::pin(QuadIteratorBatchRecordStream::new( - iterator, batch_size, + Ok(Box::pin(QuadPatternBatchRecordStream::new( + iterator, + graph_variable, + pattern, + blank_node_mode, + batch_size, ))) } } diff --git a/lib/storage/src/oxigraph_memory/quad_storage_stream.rs b/lib/storage/src/oxigraph_memory/quad_storage_stream.rs index a72943af..807ec6f5 100644 --- a/lib/storage/src/oxigraph_memory/quad_storage_stream.rs +++ b/lib/storage/src/oxigraph_memory/quad_storage_stream.rs @@ -3,53 +3,104 @@ use crate::oxigraph_memory::encoder::EncodedQuad; use crate::oxigraph_memory::store::QuadIterator; use crate::AResult; use datafusion::arrow::array::{Array, RecordBatch, RecordBatchOptions}; -use datafusion::arrow::datatypes::Schema; -use datafusion::common::DataFusionError; +use datafusion::arrow::datatypes::{Field, Schema}; +use datafusion::common::{Column, DataFusionError}; use datafusion::execution::RecordBatchStream; use futures::Stream; -use rdf_fusion_common::DFResult; -use rdf_fusion_encoding::plain_term::PlainTermArrayBuilder; -use rdf_fusion_encoding::typed_value::DEFAULT_QUAD_SCHEMA; -use rdf_fusion_model::TermRef; +use rdf_fusion_common::{BlankNodeMatchingMode, DFResult}; +use rdf_fusion_encoding::plain_term::{PlainTermArrayBuilder, PlainTermEncoding}; +use rdf_fusion_encoding::TermEncoding; +use rdf_fusion_logical::patterns::compute_schema_for_triple_pattern; +use rdf_fusion_model::{NamedNodePattern, TermPattern, TermRef, TriplePattern, Variable}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::task::{Context, Poll}; /// Stream that generates record batches on demand -pub struct QuadIteratorBatchRecordStream { +pub struct QuadPatternBatchRecordStream { + schema: Arc, iterator: QuadIterator, + graph_variable: Option, + pattern: TriplePattern, + blank_node_mode: BlankNodeMatchingMode, batch_size: usize, + equalities: Option, } -impl QuadIteratorBatchRecordStream { +impl QuadPatternBatchRecordStream { /// TODO - pub fn new(iterator: QuadIterator, batch_size: usize) -> Self { + pub fn new( + iterator: QuadIterator, + graph_variable: Option, + pattern: TriplePattern, + blank_node_mode: BlankNodeMatchingMode, + batch_size: usize, + ) -> Self { + let schema = Arc::clone( + compute_schema_for_triple_pattern( + graph_variable.as_ref().map(|v| v.as_ref()), + &pattern, + blank_node_mode, + ) + .inner(), + ); + let equalities = + QuadEqualities::try_new(graph_variable.as_ref(), &pattern, blank_node_mode); Self { + schema, iterator, + graph_variable, + pattern, + blank_node_mode, batch_size, + equalities, } } + + /// Creates a builder for the record batches. + fn create_builder(&self) -> RdfQuadsRecordBatchBuilder { + let [graph, subject, predicate, object] = extract_columns( + self.graph_variable.as_ref(), + &self.pattern, + self.blank_node_mode, + ); + RdfQuadsRecordBatchBuilder::new(graph, subject, predicate, object, self.batch_size) + } } -impl Stream for QuadIteratorBatchRecordStream { +impl Stream for QuadPatternBatchRecordStream { type Item = DFResult; fn poll_next( mut self: std::pin::Pin<&mut Self>, _ctx: &mut Context<'_>, ) -> Poll> { - // Early return if the iterator is empty - let Some(first_quad) = self.iterator.next() else { - return Poll::Ready(None); - }; + let mut exhausted = false; + let mut rb_builder = self.create_builder(); + + let mut remaining_items = self.batch_size; + let mut buffer: [Option; 32] = [const { None }; 32]; + while !exhausted && remaining_items > 0 { + for element in &mut buffer { + let Some(quad) = self.iterator.next() else { + exhausted = true; + break; + }; + *element = Some(quad); + } + + if let Some(equalities) = &self.equalities { + equalities.filter(&mut buffer); + } + + let encoded = rb_builder.encode_batch(&buffer); + remaining_items -= encoded; - let mut rb_builder = RdfQuadsRecordBatchBuilder::new(self.batch_size); - rb_builder.encode_quad(&first_quad); + buffer.fill(None); + } - for _ in 0..(self.batch_size - 1) { - let Some(quad) = self.iterator.next() else { - break; - }; - rb_builder.encode_quad(&quad); + if rb_builder.count == 0 { + return Poll::Ready(None); } let record_batch = rb_builder.finish(); @@ -60,55 +111,198 @@ impl Stream for QuadIteratorBatchRecordStream { } } -impl RecordBatchStream for QuadIteratorBatchRecordStream { +impl RecordBatchStream for QuadPatternBatchRecordStream { fn schema(&self) -> Arc { - Arc::clone(&DEFAULT_QUAD_SCHEMA) + Arc::clone(&self.schema) } } #[allow(clippy::struct_excessive_bools)] struct RdfQuadsRecordBatchBuilder { - graph: PlainTermArrayBuilder, - subject: PlainTermArrayBuilder, - predicate: PlainTermArrayBuilder, - object: PlainTermArrayBuilder, + graph: Option<(Column, PlainTermArrayBuilder)>, + subject: Option<(Column, PlainTermArrayBuilder)>, + predicate: Option<(Column, PlainTermArrayBuilder)>, + object: Option<(Column, PlainTermArrayBuilder)>, count: usize, } impl RdfQuadsRecordBatchBuilder { - fn new(capacity: usize) -> Self { + fn new( + mut graph: Option, + mut subject: Option, + mut predicate: Option, + mut object: Option, + capacity: usize, + ) -> Self { + let mut seen = HashSet::new(); + deduplicate(&mut seen, &mut graph); + deduplicate(&mut seen, &mut subject); + deduplicate(&mut seen, &mut predicate); + deduplicate(&mut seen, &mut object); + Self { - graph: PlainTermArrayBuilder::new(capacity), - subject: PlainTermArrayBuilder::new(capacity), - predicate: PlainTermArrayBuilder::new(capacity), - object: PlainTermArrayBuilder::new(capacity), + graph: graph.map(|v| (v, PlainTermArrayBuilder::new(capacity))), + subject: subject.map(|v| (v, PlainTermArrayBuilder::new(capacity))), + predicate: predicate.map(|v| (v, PlainTermArrayBuilder::new(capacity))), + object: object.map(|v| (v, PlainTermArrayBuilder::new(capacity))), count: 0, } } - fn encode_quad(&mut self, quad: &EncodedQuad) { - encode_term(&mut self.graph, &quad.graph_name); - encode_term(&mut self.subject, &quad.subject); - encode_term(&mut self.predicate, &quad.predicate); - encode_term(&mut self.object, &quad.object); - self.count += 1; + #[allow( + clippy::expect_used, + reason = "Checked via count, Maybe use unsafe if performance is an issue" + )] + fn encode_batch(&mut self, quads: &[Option; 32]) -> usize { + let count = quads.iter().position(Option::is_none).unwrap_or(32); + + if let Some((_, builder)) = &mut self.graph { + for quad in quads.iter().take(count) { + let value = &quad.as_ref().expect("Checked via count").graph_name; + encode_term(builder, value); + } + } + + if let Some((_, builder)) = &mut self.subject { + for quad in quads.iter().take(count) { + let value = &quad.as_ref().expect("Checked via count").subject; + encode_term(builder, value); + } + } + + if let Some((_, builder)) = &mut self.predicate { + for quad in quads.iter().take(count) { + let value = &quad.as_ref().expect("Checked via count").predicate; + encode_term(builder, value); + } + } + + if let Some((_, builder)) = &mut self.object { + for quad in quads.iter().take(count) { + let value = &quad.as_ref().expect("Checked via count").object; + encode_term(builder, value); + } + } + + self.count += count; + count } fn finish(self) -> AResult { - let fields: Vec> = vec![ - Arc::new(self.graph.finish()), - Arc::new(self.subject.finish()), - Arc::new(self.predicate.finish()), - Arc::new(self.object.finish()), - ]; + fn try_add_column( + fields: &mut Vec, + arrays: &mut Vec>, + column: Option<(Column, PlainTermArrayBuilder)>, + nullable: bool, + ) { + if let Some((var, builder)) = column { + fields.push(Field::new( + var.name(), + PlainTermEncoding::data_type(), + nullable, + )); + arrays.push(builder.finish()); + } + } + + let mut fields: Vec = Vec::new(); + let mut arrays: Vec> = Vec::new(); + + try_add_column(&mut fields, &mut arrays, self.graph, true); + try_add_column(&mut fields, &mut arrays, self.subject, false); + try_add_column(&mut fields, &mut arrays, self.predicate, false); + try_add_column(&mut fields, &mut arrays, self.object, false); + let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::default().with_row_count(Some(self.count)); - let record_batch = - RecordBatch::try_new_with_options(Arc::clone(&DEFAULT_QUAD_SCHEMA), fields, &options)?; + let record_batch = RecordBatch::try_new_with_options(schema, arrays, &options)?; Ok(record_batch) } } +struct QuadEqualities(Vec<[u8; 4]>); + +impl QuadEqualities { + /// Creates a new [QuadEqualities] fom the variables of the quads. + fn try_new( + graph_variable: Option<&Variable>, + pattern: &TriplePattern, + blank_node_matching_mode: BlankNodeMatchingMode, + ) -> Option { + let vars = extract_columns(graph_variable, pattern, blank_node_matching_mode); + + let mut mapping: HashMap<&Column, [u8; 4]> = HashMap::new(); + + for i in 0..4 { + if let Some(var) = &vars[i] { + let value = mapping.entry(var).or_insert([0; 4]); + value[i] = 1; + } + } + + let equalities = mapping + .into_iter() + .filter_map(|(_, vars)| { + let has_equality = vars.into_iter().filter(|v| *v == 1).count() > 1; + has_equality.then_some(vars) + }) + .collect::>(); + + if equalities.is_empty() { + None + } else { + Some(Self(equalities)) + } + } + + /// Filters the buffer in-place and fills up holes by moving all non-None entries to the front. + /// After this, all `None` slots (holes) will be at the end of the buffer. + fn filter(&self, quads: &mut [Option; 32]) { + let mut write_idx = 0; + + // Iterate over the buffer and write any matching quad to the write position. + for read_idx in 0..quads.len() { + if let Some(quad) = &mut quads[read_idx] { + if self.evaluate(quad) { + if write_idx != read_idx { + quads[write_idx] = quads[read_idx].take(); + } + write_idx += 1; + } + } + } + + // Fill the rest with None + for quad in quads.iter_mut().skip(write_idx) { + *quad = None; + } + } + + /// Evaluates whether the equalities hold for `quad`. + fn evaluate(&self, quad: &EncodedQuad) -> bool { + for equality in &self.0 { + for i in 0..4 { + for j in (i + 1)..4 { + if equality[i] == 1 && equality[j] == 1 { + let quad = [ + &quad.graph_name, + &quad.subject, + &quad.predicate, + &quad.object, + ]; + + if quad[i] != quad[j] { + return false; + } + } + } + } + } + + true + } +} + fn encode_term(builder: &mut PlainTermArrayBuilder, term: &EncodedTerm) { let term_ref = match term { EncodedTerm::DefaultGraph => None, @@ -121,3 +315,47 @@ fn encode_term(builder: &mut PlainTermArrayBuilder, term: &EncodedTerm) { Some(term_ref) => builder.append_term(term_ref), } } + +/// Returns a buffer of optional variables from `graph_variable` and `pattern`. +#[allow( + clippy::match_wildcard_for_single_variants, + reason = "We are only interested in variables" +)] +fn extract_columns( + graph_variable: Option<&Variable>, + pattern: &TriplePattern, + blank_node_mode: BlankNodeMatchingMode, +) -> [Option; 4] { + [ + graph_variable + .cloned() + .map(|v| Column::new_unqualified(v.as_str())), + match &pattern.subject { + TermPattern::Variable(v) => Some(Column::new_unqualified(v.as_str())), + TermPattern::BlankNode(bnode) if blank_node_mode == BlankNodeMatchingMode::Variable => { + Some(Column::new_unqualified(bnode.as_str())) + } + _ => None, + }, + match &pattern.predicate { + NamedNodePattern::Variable(v) => Some(Column::new_unqualified(v.as_str())), + _ => None, + }, + match &pattern.object { + TermPattern::Variable(v) => Some(Column::new_unqualified(v.as_str())), + TermPattern::BlankNode(bnode) if blank_node_mode == BlankNodeMatchingMode::Variable => { + Some(Column::new_unqualified(bnode.as_str())) + } + _ => None, + }, + ] +} + +fn deduplicate(seen: &mut HashSet, value: &mut Option) { + if let Some(value_taken) = value.take() { + if !seen.contains(&value_taken) { + seen.insert(value_taken.clone()); + *value = Some(value_taken); + } + } +} diff --git a/lib/storage/src/oxigraph_memory/store.rs b/lib/storage/src/oxigraph_memory/store.rs index f020bee1..c0ae5e8d 100644 --- a/lib/storage/src/oxigraph_memory/store.rs +++ b/lib/storage/src/oxigraph_memory/store.rs @@ -168,10 +168,10 @@ impl MemoryStorageReader { #[allow(clippy::same_name_method)] pub fn quads_for_pattern( &self, + graph_name: Option<&EncodedTerm>, subject: Option<&EncodedTerm>, predicate: Option<&EncodedTerm>, object: Option<&EncodedTerm>, - graph_name: Option<&EncodedTerm>, ) -> QuadIterator { fn get_start_and_count( map: &DashMap, u64), BuildHasherDefault>, diff --git a/testsuite/oxigraph-tests/parser-error/invalid_iri_error.txt b/testsuite/oxigraph-tests/parser-error/invalid_iri_error.txt index 26729063..d4b100c5 100644 --- a/testsuite/oxigraph-tests/parser-error/invalid_iri_error.txt +++ b/testsuite/oxigraph-tests/parser-error/invalid_iri_error.txt @@ -1 +1 @@ -Parser error between at line 2 between columns 24 and column 36: Invalid IRI code point ' ' \ No newline at end of file +Parser error at line 2 between columns 24 and 36: Invalid IRI code point ' ' \ No newline at end of file diff --git a/testsuite/oxigraph-tests/parser-error/invalid_predicate_error.txt b/testsuite/oxigraph-tests/parser-error/invalid_predicate_error.txt index 469dd19f..1abf4e46 100644 --- a/testsuite/oxigraph-tests/parser-error/invalid_predicate_error.txt +++ b/testsuite/oxigraph-tests/parser-error/invalid_predicate_error.txt @@ -1 +1 @@ -Parser error between at line 2 between columns 24 and column 27: "p" is not a valid predicate \ No newline at end of file +Parser error at line 2 between columns 24 and 27: "p" is not a valid predicate \ No newline at end of file diff --git a/testsuite/oxigraph-tests/parser-error/invalid_string_escape_error.txt b/testsuite/oxigraph-tests/parser-error/invalid_string_escape_error.txt index f5e45857..639e3f14 100644 --- a/testsuite/oxigraph-tests/parser-error/invalid_string_escape_error.txt +++ b/testsuite/oxigraph-tests/parser-error/invalid_string_escape_error.txt @@ -1 +1 @@ -Parser error between at line 1 between columns 53 and column 55: Unexpected escape character '\a' \ No newline at end of file +Parser error at line 1 between columns 53 and 55: Unexpected escape character '\a' \ No newline at end of file