Skip to content

Commit 2d82a75

Browse files
committed
save
1 parent 575db61 commit 2d82a75

File tree

24 files changed

+311
-264
lines changed

24 files changed

+311
-264
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::{bail, Context};
44
use clap::Parser;
55
use graphfusion::io::{RdfFormat, RdfParser, RdfSerializer};
66
use graphfusion::model::{GraphName, NamedNode};
7-
use graphfusion::store::Store;
7+
use graphfusion_engine::store::Store;
88
use graphfusion_web::ServerConfig;
99
use std::ffi::OsStr;
1010
use std::fs::File;

lib/graphfusion-engine/src/engine.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
use crate::planner::GraphFusionPlanner;
2+
use crate::sparql::error::QueryEvaluationError;
3+
use crate::sparql::{evaluate_query, Query, QueryExplanation, QueryOptions, QueryResults};
4+
use crate::DFResult;
5+
use arrow_rdf::TABLE_QUADS;
6+
use datafusion::dataframe::DataFrame;
7+
use datafusion::datasource::{DefaultTableSource, TableProvider};
8+
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
9+
use datafusion::functions_aggregate::first_last::FirstValue;
10+
use datafusion::logical_expr::{AggregateUDF, LogicalPlan, LogicalPlanBuilder};
11+
use datafusion::prelude::SessionContext;
12+
use graphfusion_logical::paths::PathToJoinsRule;
13+
use graphfusion_logical::patterns::{
14+
compute_filters_for_pattern, PatternNode, PatternToProjectionRule,
15+
};
16+
use model::{BlankNode, GraphNameRef, NamedNodeRef, QuadRef, SubjectRef, TermRef};
17+
use spargebra::term::TermPattern;
18+
use std::sync::Arc;
19+
20+
pub struct GraphFusionInstance {
21+
/// The DataFusion [SessionContext].
22+
ctx: SessionContext,
23+
/// The storage that backs this instance.
24+
storage: Arc<dyn TableProvider>,
25+
}
26+
27+
impl GraphFusionInstance {
28+
/// Creates a new [GraphFusionInstance] with the default configuration and the given `storage`.
29+
pub fn with_storage(storage: Arc<dyn TableProvider>) -> DFResult<Self> {
30+
let state = SessionStateBuilder::new()
31+
.with_query_planner(Arc::new(GraphFusionPlanner))
32+
.with_aggregate_functions(vec![AggregateUDF::from(FirstValue::new()).into()])
33+
.with_optimizer_rule(Arc::new(PathToJoinsRule::new(Arc::clone(&storage))))
34+
.with_optimizer_rule(Arc::new(PatternToProjectionRule))
35+
.build();
36+
37+
let session_context = SessionContext::from(state);
38+
session_context.register_table("quads", Arc::clone(&storage))?;
39+
40+
Ok(Self {
41+
ctx: session_context,
42+
storage,
43+
})
44+
}
45+
46+
/// Checks whether `quad` is contained in the instance.
47+
async fn contains(&self, quad: &QuadRef<'_>) -> DFResult<bool> {
48+
let pattern_plan = self
49+
.match_pattern(
50+
Some(quad.graph_name),
51+
Some(quad.subject),
52+
Some(quad.predicate),
53+
Some(quad.object),
54+
)
55+
.await?;
56+
57+
let count = DataFrame::new(self.ctx.state(), pattern_plan)
58+
.count()
59+
.await?;
60+
61+
Ok(count > 0)
62+
}
63+
64+
/// Returns the number of quads in the instance.
65+
async fn len(&self) -> DFResult<usize> {
66+
self.ctx.table(TABLE_QUADS).await?.count().await
67+
}
68+
69+
/// Returns a stream of all quads that match the given pattern.
70+
async fn quads_for_pattern(
71+
&self,
72+
graph_name: Option<GraphNameRef<'_>>,
73+
subject: Option<SubjectRef<'_>>,
74+
predicate: Option<NamedNodeRef<'_>>,
75+
object: Option<TermRef<'_>>,
76+
) -> DFResult<SendableRecordBatchStream> {
77+
let plan = self
78+
.match_pattern(graph_name, subject, predicate, object)
79+
.await?;
80+
let result = DataFrame::new(self.ctx.state(), plan)
81+
.execute_stream()
82+
.await?;
83+
Ok(result)
84+
}
85+
86+
/// Evaluates a SPARQL [Query] over the instance.
87+
async fn execute_query(
88+
&self,
89+
query: &Query,
90+
options: QueryOptions,
91+
) -> Result<(QueryResults, Option<QueryExplanation>), QueryEvaluationError> {
92+
evaluate_query(&self.ctx, query, options).await
93+
}
94+
95+
/// Creates a [LogicalPlan] for computing all quads that match the given pattern.
96+
async fn match_pattern(
97+
&self,
98+
graph_name: Option<GraphNameRef<'_>>,
99+
subject: Option<SubjectRef<'_>>,
100+
predicate: Option<NamedNodeRef<'_>>,
101+
object: Option<TermRef<'_>>,
102+
) -> DFResult<LogicalPlan> {
103+
let plan = LogicalPlanBuilder::scan(
104+
TABLE_QUADS,
105+
Arc::new(DefaultTableSource::new(Arc::clone(&self.storage))),
106+
None,
107+
)?;
108+
109+
let graph_name = graph_name
110+
.map(|g| TermPattern::from(g.to_owned()))
111+
.unwrap_or(TermPattern::BlankNode(BlankNode::default()));
112+
let subject = subject
113+
.map(|s| TermPattern::from(s.into_owned()))
114+
.unwrap_or(TermPattern::BlankNode(BlankNode::default()));
115+
let predicate = predicate
116+
.map(|p| TermPattern::from(p.into_owned()))
117+
.unwrap_or(TermPattern::BlankNode(BlankNode::default()));
118+
let object = object
119+
.map(|o| TermPattern::from(o.into_owned()))
120+
.unwrap_or(TermPattern::BlankNode(BlankNode::default()));
121+
let pattern_node = PatternNode::try_new(
122+
plan.clone().build()?,
123+
vec![graph_name, subject, predicate, object],
124+
)?;
125+
let filter = compute_filters_for_pattern(&pattern_node);
126+
127+
let plan = match filter {
128+
None => plan.build()?,
129+
Some(filter) => plan.filter(filter)?.build()?,
130+
};
131+
}
132+
}

lib/graphfusion-engine/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ extern crate core;
33
pub mod error;
44
pub mod results;
55
pub mod sparql;
6-
mod triple_store;
6+
mod quad_storage;
7+
mod planner;
8+
mod engine;
79

8-
pub use triple_store::QuadStorage;
10+
pub use quad_storage::QuadStorage;
11+
pub use engine::GraphFusionInstance;
912

1013
type DFResult<T> = datafusion::error::Result<T>;

lib/graphfusion-physical/src/planner.rs renamed to lib/graphfusion-engine/src/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use crate::paths::KleenePlusPathPlanner;
21
use async_trait::async_trait;
32
use datafusion::execution::context::QueryPlanner;
43
use datafusion::execution::SessionState;
54
use datafusion::logical_expr::LogicalPlan;
65
use datafusion::physical_plan::ExecutionPlan;
76
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
7+
use graphfusion_physical::paths::KleenePlusPathPlanner;
88
use std::sync::Arc;
99

1010
#[derive(Debug, Default)]
@@ -18,7 +18,7 @@ impl QueryPlanner for GraphFusionPlanner {
1818
session_state: &SessionState,
1919
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
2020
let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
21-
KleenePlusPathPlanner::new(),
21+
KleenePlusPathPlanner,
2222
)]);
2323
physical_planner
2424
.create_physical_plan(logical_plan, session_state)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use std::sync::Arc;
2+
use async_trait::async_trait;
3+
use datafusion::common::DataFusionError;
4+
use datafusion::datasource::TableProvider;
5+
use model::{Quad, QuadRef};
6+
7+
#[async_trait]
8+
#[allow(clippy::len_without_is_empty)]
9+
pub trait QuadStorage {
10+
/// Returns the table name of this [QuadStorage]. This name is used to register a table in the
11+
/// DataFusion engine.
12+
async fn table_name(&self) -> &str;
13+
14+
/// Returns the [TableProvider] for this [QuadStorage]. This provider is registered in the
15+
/// DataFusion session and used for planning the execution of queries.
16+
async fn table_provider(&self) -> Arc<dyn TableProvider>;
17+
18+
/// Loads the given quads into the storage.
19+
async fn load_quads(&self, quads: Vec<Quad>) -> Result<usize, DataFusionError>;
20+
21+
/// Removes the given quad from the storage.
22+
async fn remove<'a>(&self, quad: QuadRef<'_>) -> Result<bool, DataFusionError>;
23+
}

lib/graphfusion-engine/src/sparql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation.
22
//!
3-
//! Stores execute SPARQL. See [`Store`](crate::store::Store::query()) for an example.
3+
//! Stores execute SPARQL. See [`Store`](graphfusion::store::Store::query()) for an example.
44
55
mod algebra;
66
pub mod error;

lib/graphfusion-engine/src/triple_store.rs

Lines changed: 0 additions & 44 deletions
This file was deleted.

lib/graphfusion-logical/src/patterns/logical.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,18 @@ use spargebra::term::TermPattern;
88
use std::cmp::Ordering;
99
use std::collections::HashMap;
1010
use std::fmt;
11+
use std::fmt::Display;
1112
use std::sync::Arc;
1213

1314
#[derive(PartialEq, Eq, Hash)]
1415
pub struct PatternNode {
1516
input: LogicalPlan,
16-
patterns: Vec<TermPattern>,
17+
patterns: Vec<dyn PatternElement>,
1718
schema: DFSchemaRef,
1819
}
1920

2021
impl PatternNode {
21-
pub fn try_new(input: LogicalPlan, patterns: Vec<TermPattern>) -> DFResult<Self> {
22+
pub fn try_new(input: LogicalPlan, patterns: Vec<dyn PatternElement>) -> DFResult<Self> {
2223
if input.schema().columns().len() != patterns.len() {
2324
return plan_err!("Patterns must match the number of column of inner.");
2425
}
Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
mod logical;
22
mod rewrite;
3+
mod pattern_element;
34

45
pub use logical::PatternNode;
6+
pub use rewrite::compute_filters_for_pattern;
57
pub use rewrite::PatternToProjectionRule;
6-
use spargebra::term::TermPattern;
7-
8-
fn pattern_to_variable_name(pattern: &TermPattern) -> Option<String> {
9-
match pattern {
10-
TermPattern::BlankNode(bnode) => Some(format!("_:{}", bnode.as_ref().as_str())),
11-
TermPattern::Variable(var) => Some(var.as_str().into()),
12-
_ => None,
13-
}
14-
}

0 commit comments

Comments
 (0)