Skip to content

Add PartiqlSharedCatalog for Send+Sync catalog usage. #583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- Changed many internal `HashMap`s to use `rustc-hash`'s `FxHash`
- *BREAKING* Refactors `Catalog` to allow `Send`+`Sync` for re-use.

### Added

Expand Down
7 changes: 4 additions & 3 deletions extension/partiql-extension-csv/tests/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use partiql_catalog::catalog::{Catalog, PartiqlCatalog};
use partiql_catalog::catalog::{PartiqlCatalog, SharedCatalog};
use partiql_catalog::context::SystemContext;
use partiql_catalog::extension::Extension;
use partiql_eval::env::basic::MapBindings;
Expand All @@ -19,7 +19,7 @@ pub(crate) fn parse(statement: &str) -> ParserResult<'_> {
#[track_caller]
#[inline]
pub(crate) fn lower(
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
parsed: &Parsed<'_>,
) -> partiql_logical::LogicalPlan<partiql_logical::BindingsOp> {
let planner = partiql_logical_planner::LogicalPlanner::new(catalog);
Expand All @@ -29,7 +29,7 @@ pub(crate) fn lower(
#[track_caller]
#[inline]
pub(crate) fn evaluate(
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
logical: partiql_logical::LogicalPlan<partiql_logical::BindingsOp>,
bindings: MapBindings<Value>,
) -> (Value, Vec<EvaluationError>) {
Expand Down Expand Up @@ -61,6 +61,7 @@ pub(crate) fn evaluate_with_csv_scan(
let ext = CsvExtension {};
ext.load(&mut catalog)
.expect("ion extension load to succeed");
let catalog = catalog.to_shared_catalog();

let parsed = parse(statement);
let lowered = lower(&catalog, &parsed.expect("parse"));
Expand Down
7 changes: 4 additions & 3 deletions extension/partiql-extension-ion-functions/tests/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use partiql_catalog::catalog::{Catalog, PartiqlCatalog};
use partiql_catalog::catalog::{PartiqlCatalog, SharedCatalog};
use partiql_catalog::context::SystemContext;
use partiql_catalog::extension::Extension;
use partiql_eval::env::basic::MapBindings;
Expand All @@ -19,7 +19,7 @@ pub(crate) fn parse(statement: &str) -> ParserResult<'_> {
#[track_caller]
#[inline]
pub(crate) fn lower(
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
parsed: &Parsed<'_>,
) -> partiql_logical::LogicalPlan<partiql_logical::BindingsOp> {
let planner = partiql_logical_planner::LogicalPlanner::new(catalog);
Expand All @@ -29,7 +29,7 @@ pub(crate) fn lower(
#[track_caller]
#[inline]
pub(crate) fn evaluate(
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
logical: partiql_logical::LogicalPlan<partiql_logical::BindingsOp>,
bindings: MapBindings<Value>,
) -> (Value, Vec<EvaluationError>) {
Expand Down Expand Up @@ -61,6 +61,7 @@ pub(crate) fn evaluate_with_ion_scan(
let ext = IonExtension {};
ext.load(&mut catalog)
.expect("ion extension load to succeed");
let catalog = catalog.to_shared_catalog();

let parsed = parse(statement);
let lowered = lower(&catalog, &parsed.expect("parse"));
Expand Down
6 changes: 3 additions & 3 deletions partiql-ast-passes/src/name_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use indexmap::{IndexMap, IndexSet};
use partiql_ast::ast;
use partiql_ast::ast::{GraphPattern, GroupByExpr, GroupKey};
use partiql_ast::visit::{Traverse, Visit, Visitor};
use partiql_catalog::catalog::Catalog;
use partiql_catalog::catalog::SharedCatalog;
use partiql_common::node::NodeId;
use std::sync::atomic::{AtomicU32, Ordering};

Expand Down Expand Up @@ -102,11 +102,11 @@ pub struct NameResolver<'c> {

// errors that occur during name resolution
errors: Vec<AstTransformError>,
catalog: &'c dyn Catalog,
catalog: &'c dyn SharedCatalog,
}

impl<'c> NameResolver<'c> {
pub fn new(catalog: &'c dyn Catalog) -> Self {
pub fn new(catalog: &'c dyn SharedCatalog) -> Self {
NameResolver {
// environment stack tracking
id_path_to_root: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion partiql-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ ordered-float = "5"
itertools = "0.14"
unicase = "2.7"
rustc-hash = "2"

delegate = "0.13"
dyn-clone = "1"
39 changes: 33 additions & 6 deletions partiql-catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::call_defs::ScalarFnCallSpecs;
use crate::scalar_fn::ScalarFunction;
use crate::table_fn::TableFunction;
use delegate::delegate;
use partiql_common::catalog::{CatalogId, EntryId, ObjectId};
use partiql_types::PartiqlShape;
use rustc_hash::FxHashMap;
Expand Down Expand Up @@ -42,19 +43,22 @@ pub enum CatalogErrorKind {
Unknown,
}

pub trait Catalog: Debug {
pub trait MutableCatalog: Debug {
fn add_table_function(&mut self, info: TableFunction) -> Result<ObjectId, CatalogError>;
fn add_scalar_function(&mut self, info: ScalarFunction) -> Result<ObjectId, CatalogError>;

fn add_type_entry(&mut self, entry: TypeEnvEntry<'_>) -> Result<ObjectId, CatalogError>;
}

pub trait ReadOnlyCatalog: Debug {
fn get_function(&self, name: &str) -> Option<FunctionEntry<'_>>;

fn get_function_by_id(&self, id: ObjectId) -> Option<FunctionEntry<'_>>;

fn resolve_type(&self, name: &str) -> Option<TypeEntry>;
}

pub trait SharedCatalog: ReadOnlyCatalog + Send + Sync {}

pub trait Catalog: MutableCatalog + ReadOnlyCatalog {}

#[derive(Debug)]
pub struct TypeEnvEntry<'a> {
name: UniCase<String>,
Expand Down Expand Up @@ -123,6 +127,9 @@ pub struct PartiqlCatalog {
id: CatalogId,
}

#[derive(Debug)]
pub struct PartiqlSharedCatalog(PartiqlCatalog);

impl Default for PartiqlCatalog {
fn default() -> Self {
PartiqlCatalog {
Expand All @@ -133,9 +140,15 @@ impl Default for PartiqlCatalog {
}
}

impl PartiqlCatalog {}
impl PartiqlCatalog {
pub fn to_shared_catalog(self) -> PartiqlSharedCatalog {
PartiqlSharedCatalog(self)
}
}

impl Catalog for PartiqlCatalog {}

impl Catalog for PartiqlCatalog {
impl MutableCatalog for PartiqlCatalog {
fn add_table_function(&mut self, info: TableFunction) -> Result<ObjectId, CatalogError> {
let call_def = info.call_def();
let names = call_def.names.clone();
Expand Down Expand Up @@ -180,7 +193,9 @@ impl Catalog for PartiqlCatalog {
Err(e) => Err(e),
}
}
}

impl ReadOnlyCatalog for PartiqlCatalog {
fn get_function(&self, name: &str) -> Option<FunctionEntry<'_>> {
self.functions
.find_by_name(name)
Expand All @@ -202,6 +217,18 @@ impl Catalog for PartiqlCatalog {
}
}

impl ReadOnlyCatalog for PartiqlSharedCatalog {
delegate! {
to self.0 {
fn get_function(&self, name: &str) -> Option<FunctionEntry<'_>>;
fn get_function_by_id(&self, id: ObjectId) -> Option<FunctionEntry<'_>>;
fn resolve_type(&self, name: &str) -> Option<TypeEntry>;
}
}
}

impl SharedCatalog for PartiqlSharedCatalog {}

impl PartiqlCatalog {
fn to_function_entry<'a>(
&'a self,
Expand Down
2 changes: 1 addition & 1 deletion partiql-catalog/src/scalar_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fmt::{Debug, Formatter};
pub type ScalarFnExprResultValue<'a> = Cow<'a, Value>;
pub type ScalarFnExprResult<'a> = Result<ScalarFnExprResultValue<'a>, ExtensionResultError>;

pub trait ScalarFnExpr: DynClone + Debug {
pub trait ScalarFnExpr: DynClone + Debug + Send + Sync {
fn evaluate<'c>(
&self,
args: &[Cow<'_, Value>],
Expand Down
2 changes: 1 addition & 1 deletion partiql-catalog/src/table_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait BaseTableExpr: Debug {
) -> BaseTableExprResult<'c>;
}

pub trait BaseTableFunctionInfo: Debug {
pub trait BaseTableFunctionInfo: Debug + Send + Sync {
fn call_def(&self) -> &CallDef;
fn plan_eval(&self) -> Box<dyn BaseTableExpr>;
}
Expand Down
28 changes: 18 additions & 10 deletions partiql-conformance-tests/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
use partiql_ast_passes::error::AstTransformationError;
use partiql_eval as eval;
use std::ops::Deref;

use partiql_eval::error::{EvalErr, PlanErr};
use partiql_eval::eval::{BasicContext, EvalContext, EvalPlan, EvalResult, Evaluated};
use partiql_logical as logical;
use partiql_parser::{Parsed, ParserError, ParserResult};
use partiql_value::DateTime;

use partiql_catalog::catalog::{Catalog, PartiqlCatalog};
use partiql_catalog::catalog::{PartiqlCatalog, PartiqlSharedCatalog, SharedCatalog};
use partiql_catalog::context::SystemContext;
use thiserror::Error;

mod test_value;
pub(crate) use test_value::TestValue;

use once_cell::sync::Lazy;
pub(crate) static SHARED_CATALOG: Lazy<PartiqlSharedCatalog> = Lazy::new(init_shared_catalog);

fn init_shared_catalog() -> PartiqlSharedCatalog {
PartiqlCatalog::default().to_shared_catalog()
}

#[derive(Debug, Copy, Clone)]
#[allow(dead_code)]
pub(crate) enum EvaluationMode {
Expand Down Expand Up @@ -52,7 +60,7 @@ pub(crate) fn parse(statement: &str) -> ParserResult {
#[track_caller]
#[inline]
pub(crate) fn lower(
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
parsed: &Parsed<'_>,
) -> Result<logical::LogicalPlan<logical::BindingsOp>, AstTransformationError> {
let planner = partiql_logical_planner::LogicalPlanner::new(catalog);
Expand All @@ -63,7 +71,7 @@ pub(crate) fn lower(
#[inline]
pub(crate) fn compile(
mode: EvaluationMode,
catalog: &dyn Catalog,
catalog: &dyn SharedCatalog,
logical: logical::LogicalPlan<logical::BindingsOp>,
) -> Result<EvalPlan, PlanErr> {
let mut planner = eval::plan::EvaluatorPlanner::new(mode.into(), catalog);
Expand Down Expand Up @@ -103,9 +111,9 @@ pub(crate) fn pass_syntax(statement: &str) -> Parsed {
#[inline]
#[allow(dead_code)]
pub(crate) fn fail_semantics(statement: &str) {
let catalog = PartiqlCatalog::default();
let catalog: &PartiqlSharedCatalog = SHARED_CATALOG.deref();
if let Ok(parsed) = parse(statement) {
let lowered = lower(&catalog, &parsed);
let lowered = lower(catalog, &parsed);

assert!(
lowered.is_err(),
Expand All @@ -118,9 +126,9 @@ pub(crate) fn fail_semantics(statement: &str) {
#[inline]
#[allow(dead_code)]
pub(crate) fn pass_semantics(statement: &str) {
let catalog = PartiqlCatalog::default();
let catalog: &PartiqlSharedCatalog = SHARED_CATALOG.deref();
let parsed = pass_syntax(statement);
let lowered = lower(&catalog, &parsed);
let lowered = lower(catalog, &parsed);
assert!(
lowered.is_ok(),
"When semantically verifying `{statement}`, expected `Ok(_)`, but was `{lowered:#?}`"
Expand Down Expand Up @@ -171,17 +179,17 @@ pub(crate) fn eval<'a>(
mode: EvaluationMode,
env: &Option<TestValue>,
) -> Result<Evaluated, TestError<'a>> {
let catalog = PartiqlCatalog::default();
let catalog: &PartiqlSharedCatalog = SHARED_CATALOG.deref();

let parsed = parse(statement)?;
let lowered = lower(&catalog, &parsed)?;
let lowered = lower(catalog, &parsed)?;

let bindings = env.as_ref().map(|e| (&e.value).into()).unwrap_or_default();
let sys = SystemContext {
now: DateTime::from_system_now_utc(),
};
let ctx = BasicContext::new(bindings, sys);
let plan = compile(mode, &catalog, lowered)?;
let plan = compile(mode, catalog, lowered)?;

Ok(evaluate(plan, &ctx)?)
}
Expand Down
1 change: 1 addition & 0 deletions partiql-eval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rustc-hash = "2"
delegate = "0.13"

serde = { version = "1", features = ["derive"], optional = true }
once_cell = "1"

[dev-dependencies]
criterion = "0.5"
Expand Down
13 changes: 10 additions & 3 deletions partiql-eval/benches/bench_eval.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::borrow::Cow;
use std::ops::Deref;
use std::time::Duration;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use partiql_catalog::catalog::PartiqlCatalog;
use partiql_catalog::catalog::{PartiqlCatalog, PartiqlSharedCatalog};
use partiql_catalog::context::SystemContext;

use once_cell::sync::Lazy;
use partiql_eval::env::basic::MapBindings;
use partiql_eval::eval::{BasicContext, EvalPlan};
use partiql_eval::plan;
Expand All @@ -16,6 +18,11 @@ use partiql_logical::{
VarRefType,
};
use partiql_value::{bag, list, tuple, BindingsName, DateTime, Value};
pub(crate) static SHARED_CATALOG: Lazy<PartiqlSharedCatalog> = Lazy::new(init_shared_catalog);

fn init_shared_catalog() -> PartiqlSharedCatalog {
PartiqlCatalog::default().to_shared_catalog()
}

fn data() -> MapBindings<Value> {
let hr = tuple![(
Expand Down Expand Up @@ -130,8 +137,8 @@ fn logical_plan() -> LogicalPlan<BindingsOp> {
}

fn eval_plan(logical: &LogicalPlan<BindingsOp>) -> EvalPlan {
let catalog = PartiqlCatalog::default();
let mut planner = plan::EvaluatorPlanner::new(EvaluationMode::Permissive, &catalog);
let catalog: &PartiqlSharedCatalog = SHARED_CATALOG.deref();
let mut planner = plan::EvaluatorPlanner::new(EvaluationMode::Permissive, catalog);
planner.compile(logical).expect("Expect no plan error")
}

Expand Down
2 changes: 1 addition & 1 deletion partiql-eval/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod tests {
use partiql_value::{bag, list, tuple, Bag, BindingsName, DateTime, List, Tuple, Value};

fn evaluate(logical: LogicalPlan<BindingsOp>, bindings: MapBindings<Value>) -> Value {
let catalog = PartiqlCatalog::default();
let catalog = PartiqlCatalog::default().to_shared_catalog();
let mut planner = plan::EvaluatorPlanner::new(EvaluationMode::Permissive, &catalog);
let plan = planner.compile(&logical).expect("Expect no plan error");
let sys = SystemContext {
Expand Down
8 changes: 4 additions & 4 deletions partiql-eval/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::eval::graph::string_graph::StringGraphTypes;
use crate::eval::EvalPlan;
use eval::graph::plan as physical;
use itertools::{Either, Itertools};
use partiql_catalog::catalog::{Catalog, FunctionEntryFunction};
use partiql_catalog::catalog::{FunctionEntryFunction, SharedCatalog};
use partiql_extension_ion::boxed_ion::BoxedIonType;
use partiql_logical as logical;
use partiql_logical::{
Expand Down Expand Up @@ -61,7 +61,7 @@ pub enum EvaluationMode {

pub struct EvaluatorPlanner<'c> {
mode: EvaluationMode,
catalog: &'c dyn Catalog,
catalog: &'c dyn SharedCatalog,
errors: Vec<PlanningError>,
}

Expand Down Expand Up @@ -126,7 +126,7 @@ impl From<&BinaryOp> for EvalOpBinary {
}

impl<'c> EvaluatorPlanner<'c> {
pub fn new(mode: EvaluationMode, catalog: &'c dyn Catalog) -> Self {
pub fn new(mode: EvaluationMode, catalog: &'c dyn SharedCatalog) -> Self {
EvaluatorPlanner {
mode,
catalog,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ mod tests {
let sink = logical.add_operator(BindingsOp::Sink);
logical.add_flow(expq, sink);

let catalog = PartiqlCatalog::default();
let catalog = PartiqlCatalog::default().to_shared_catalog();
let mut planner = EvaluatorPlanner::new(EvaluationMode::Permissive, &catalog);
let plan = planner.compile(&logical);

Expand Down
Loading
Loading