Skip to content

Commit ed593ea

Browse files
authored
feat: add initial internal catalog (#30)
## Problem A new event (inserts, deletes, schema changes) might invalidate the memoized winner of a goal (group + required physical properties). With our catalog, we could track these events using epochs and reason about the freshness of the schema and statistics information. ## Summary of changes - added `events` and other catalog-related tables. - added a minimum catalog interface. _misc_ - fix `optd-datafusion-cli` store issue. ## Future works - add constraints and statistics as we need them in the system. - embed epoch IDs to schema and statistics. --------- Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
1 parent fc61df0 commit ed593ea

14 files changed

+365
-3
lines changed

optd-core/src/catalog/mod.rs

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
use std::sync::Arc;
2+
3+
use sqlx::prelude::FromRow;
4+
5+
use crate::storage::memo::SqliteMemo;
6+
7+
#[trait_variant::make(Send)]
8+
pub trait Catalog {
9+
async fn create_database(&self, db_name: &str) -> anyhow::Result<Arc<DatabaseMetadata>>;
10+
11+
async fn get_database(&self, db_name: &str) -> anyhow::Result<Arc<DatabaseMetadata>>;
12+
13+
async fn create_namespace(
14+
&self,
15+
database_id: DatabaseId,
16+
namespace_name: &str,
17+
) -> anyhow::Result<Arc<NamespaceMetadata>>;
18+
19+
async fn get_namespace(
20+
&self,
21+
db_name: &str,
22+
namespace_name: &str,
23+
) -> anyhow::Result<Arc<NamespaceMetadata>>;
24+
25+
async fn create_table(
26+
&self,
27+
namespace_id: NamespaceId,
28+
table_name: &str,
29+
schema: &Schema,
30+
) -> anyhow::Result<Arc<TableMetadata>>;
31+
32+
async fn get_table(
33+
&self,
34+
db_name: &str,
35+
namespace_name: &str,
36+
table_name: &str,
37+
) -> anyhow::Result<Arc<TableMetadata>>;
38+
39+
async fn get_schema(&self, table_id: TableId) -> anyhow::Result<Schema>;
40+
}
41+
42+
pub struct OptdCatalog {
43+
storage: Arc<SqliteMemo>,
44+
}
45+
46+
impl Catalog for OptdCatalog {
47+
async fn create_database(&self, db_name: &str) -> anyhow::Result<Arc<DatabaseMetadata>> {
48+
let mut txn = self.storage.begin().await?;
49+
let db: DatabaseMetadata =
50+
sqlx::query_as("INSERT INTO database_metadata (name) VALUES (?) RETURNING id, name")
51+
.bind(db_name)
52+
.fetch_one(&mut *txn)
53+
.await
54+
.map_err(|e| {
55+
anyhow::anyhow!("Failed to create database metadata for {}: {}", db_name, e)
56+
})?;
57+
txn.commit().await?;
58+
Ok(Arc::new(db))
59+
}
60+
61+
async fn get_database(&self, db_name: &str) -> anyhow::Result<Arc<DatabaseMetadata>> {
62+
let mut txn = self.storage.begin().await?;
63+
let db: DatabaseMetadata =
64+
sqlx::query_as("SELECT id, name FROM database_metadata WHERE name = ?")
65+
.bind(db_name)
66+
.fetch_one(&mut *txn)
67+
.await
68+
.map_err(|e| {
69+
anyhow::anyhow!("Failed to get database metadata for {}: {}", db_name, e)
70+
})?;
71+
txn.commit().await?;
72+
Ok(Arc::new(db))
73+
}
74+
75+
async fn create_namespace(
76+
&self,
77+
database_id: DatabaseId,
78+
namespace_name: &str,
79+
) -> anyhow::Result<Arc<NamespaceMetadata>> {
80+
let mut txn = self.storage.begin().await?;
81+
let namespace: NamespaceMetadata = sqlx::query_as(
82+
"INSERT INTO namespace_metadata (name, database_id) VALUES (?, ?) RETURNING id, name",
83+
)
84+
.bind(namespace_name)
85+
.bind(database_id)
86+
.fetch_one(&mut *txn)
87+
.await
88+
.map_err(|e| {
89+
anyhow::anyhow!(
90+
"Failed to create namespace metadata for {}: {}",
91+
namespace_name,
92+
e
93+
)
94+
})?;
95+
txn.commit().await?;
96+
Ok(Arc::new(namespace))
97+
}
98+
99+
async fn get_namespace(
100+
&self,
101+
db_name: &str,
102+
namespace_name: &str,
103+
) -> anyhow::Result<Arc<NamespaceMetadata>> {
104+
let mut txn = self.storage.begin().await?;
105+
let namespace: NamespaceMetadata = sqlx::query_as(
106+
"SELECT namespace_metadata.id, namespace_metadata.name FROM namespace_metadata, database_metadata WHERE database_metadata.name = ? and namspace_metadata.name = ? and namespace_metadata.database_id = database_metadata.id",
107+
)
108+
.bind(db_name)
109+
.bind(namespace_name)
110+
.fetch_one(&mut *txn)
111+
.await
112+
.map_err(|e| {
113+
anyhow::anyhow!(
114+
"Failed to get namespace metadata for {}.{}: {}",
115+
db_name,
116+
namespace_name,
117+
e
118+
)
119+
})?;
120+
txn.commit().await?;
121+
Ok(Arc::new(namespace))
122+
}
123+
124+
async fn create_table(
125+
&self,
126+
namespace_id: NamespaceId,
127+
table_name: &str,
128+
schema: &Schema,
129+
) -> anyhow::Result<Arc<TableMetadata>> {
130+
let mut txn = self.storage.begin().await?;
131+
let table: TableMetadata = sqlx::query_as(
132+
"INSERT INTO table_metadata (name, namespace_id) VALUES (?, ?) RETURNING id, name",
133+
)
134+
.bind(table_name)
135+
.bind(namespace_id)
136+
.fetch_one(&mut *txn)
137+
.await
138+
.map_err(|e| {
139+
anyhow::anyhow!("Failed to create table metadata for {}: {}", table_name, e)
140+
})?;
141+
for (i, attribute) in schema.attributes.iter().enumerate() {
142+
sqlx::query("INSERT INTO attributes (name, kind, table_id, base_attribute_number) VALUES (?, ?, ?, ?)")
143+
.bind(&attribute.name)
144+
.bind(&attribute.kind)
145+
.bind(table.id)
146+
.bind(i as i64)
147+
.execute(&mut *txn)
148+
.await
149+
.map_err(|e| {
150+
anyhow::anyhow!(
151+
"Failed to create attribute metadata for {}.{}: {}",
152+
table_name,
153+
attribute.name,
154+
e
155+
)
156+
})?;
157+
}
158+
txn.commit().await?;
159+
Ok(Arc::new(table))
160+
}
161+
162+
async fn get_table(
163+
&self,
164+
db_name: &str,
165+
namespace_name: &str,
166+
table_name: &str,
167+
) -> anyhow::Result<Arc<TableMetadata>> {
168+
let mut txn = self.storage.begin().await?;
169+
let table: TableMetadata = sqlx::query_as(
170+
"SELECT table_metadata.id, table_metadata.name FROM table_metadata, namespace_metadata, database_metadata WHERE database_metadata.name = ? and namspace_metadata.name = ? and namespace_metadata.database_id = database_metadata.id and table_metadata.namespace_id = namespace_metadata.id and table_metadata.name = ?",
171+
)
172+
.bind(db_name)
173+
.bind(namespace_name)
174+
.bind(table_name)
175+
.fetch_one(&mut *txn)
176+
.await
177+
.map_err(|e| {
178+
anyhow::anyhow!(
179+
"Failed to get table metadata for {}.{}.{}: {}",
180+
db_name,
181+
namespace_name,
182+
table_name,
183+
e
184+
)
185+
})?;
186+
txn.commit().await?;
187+
Ok(Arc::new(table))
188+
}
189+
190+
async fn get_schema(&self, table_id: TableId) -> anyhow::Result<Schema> {
191+
let mut txn = self.storage.begin().await?;
192+
let attributes: Vec<Attribute> = sqlx::query_as(
193+
"SELECT attributes.id, attributes.name, attributes.kind FROM attributes WHERE attributes.table_id = ?",
194+
)
195+
.bind(table_id)
196+
.fetch_all(&mut *txn)
197+
.await
198+
.map_err(|e| {
199+
anyhow::anyhow!(
200+
"Failed to get schema metadata for table {:?}: {}",
201+
table_id,
202+
e
203+
)
204+
})?;
205+
206+
Ok(Schema { attributes })
207+
}
208+
}
209+
210+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
211+
#[repr(transparent)]
212+
#[sqlx(transparent)]
213+
pub struct DatabaseId(i64);
214+
215+
#[derive(Debug, Clone, PartialEq, Eq, FromRow, sqlx::Type)]
216+
pub struct DatabaseMetadata {
217+
pub id: DatabaseId,
218+
pub name: String,
219+
}
220+
221+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
222+
#[repr(transparent)]
223+
#[sqlx(transparent)]
224+
pub struct NamespaceId(i64);
225+
226+
#[derive(Debug, Clone, PartialEq, Eq, FromRow, sqlx::Type)]
227+
pub struct NamespaceMetadata {
228+
pub id: NamespaceId,
229+
pub name: String,
230+
}
231+
232+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
233+
#[repr(transparent)]
234+
#[sqlx(transparent)]
235+
pub struct TableId(i64);
236+
237+
#[derive(Debug, Clone, PartialEq, Eq, FromRow, sqlx::Type)]
238+
pub struct TableMetadata {
239+
pub id: TableId,
240+
pub name: String,
241+
}
242+
243+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
244+
#[repr(transparent)]
245+
#[sqlx(transparent)]
246+
pub struct AttributeId(i64);
247+
248+
#[derive(Debug, Clone, PartialEq, Eq, FromRow, sqlx::Type)]
249+
pub struct Attribute {
250+
/// The unique identifier for the attribute.
251+
pub id: AttributeId,
252+
/// The name of the attribute.
253+
pub name: String,
254+
/// The kind (data type) of the attribute.
255+
pub kind: String,
256+
}
257+
258+
pub struct Schema {
259+
/// The attributes in the schema.
260+
pub attributes: Vec<Attribute>,
261+
}
262+
263+
#[cfg(test)]
264+
mod tests {
265+
266+
use super::*;
267+
268+
#[tokio::test]
269+
async fn test_schema() -> anyhow::Result<()> {
270+
let storage = Arc::new(SqliteMemo::new_in_memory().await?);
271+
let catalog = OptdCatalog { storage };
272+
273+
let db = catalog.create_database("test_db").await?;
274+
let namespace = catalog.create_namespace(db.id, "test_namespace").await?;
275+
let schema = Schema {
276+
attributes: vec![
277+
Attribute {
278+
id: AttributeId(1),
279+
name: "id".to_string(),
280+
kind: "INTEGER".to_string(),
281+
},
282+
Attribute {
283+
id: AttributeId(2),
284+
name: "name".to_string(),
285+
kind: "TEXT".to_string(),
286+
},
287+
],
288+
};
289+
let table = catalog
290+
.create_table(namespace.id, "test_table", &schema)
291+
.await?;
292+
293+
assert_eq!(table.name, "test_table");
294+
assert_eq!(table.id, TableId(1));
295+
296+
let schema = catalog.get_schema(table.id).await?;
297+
assert_eq!(schema.attributes.len(), 2);
298+
assert_eq!(schema.attributes[0].name, "id");
299+
assert_eq!(schema.attributes[0].kind, "INTEGER");
300+
assert_eq!(schema.attributes[1].name, "name");
301+
assert_eq!(schema.attributes[1].kind, "TEXT");
302+
303+
Ok(())
304+
}
305+
}

optd-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#[allow(dead_code)]
22
pub mod cascades;
3+
pub mod catalog;
34
pub mod dsl;
45
pub mod engine;
56
pub mod operators;

optd-core/src/storage/memo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl SqliteMemo {
7474
}
7575

7676
/// Begin a new transaction.
77-
pub(super) async fn begin(&self) -> anyhow::Result<Transaction<'_>> {
77+
pub(crate) async fn begin(&self) -> anyhow::Result<Transaction<'_>> {
7878
let txn = self.db.begin().await?;
7979
Transaction::new(txn).await
8080
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE events;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE TABLE events (
2+
-- A new epoch is created every time an event happens.
3+
epoch_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
4+
-- The type of event that occurred (e.g. )
5+
-- TODO(yuchen): need more information than the event type to track what is modified.
6+
event_type TEXT NOT NULL,
7+
-- The time at which the event occurred.
8+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
9+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE database_metadata;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE database_metadata (
2+
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
3+
-- The name of the database.
4+
name TEXT NOT NULL,
5+
-- The time at which the database was created.
6+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
7+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE namespace_metadata;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE namespace_metadata (
2+
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
3+
-- The namespace.
4+
name TEXT NOT NULL,
5+
-- The database that the namespace belongs to.
6+
database_id BIGINT NOT NULL,
7+
-- The time at which the database was created.
8+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
9+
10+
FOREIGN KEY (database_id) REFERENCES database_metadata(id) ON DELETE CASCADE
11+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE table_metadata;

0 commit comments

Comments
 (0)