Skip to content

Commit 1ef01dd

Browse files
authored
chore(cubestore): Realtime improvements (cube-js#5064)
* realtime improvements * fmt * deactivate empty chunks on repartition * fix * fix * fix * fix * fix * fix * fixes and changes * fmt * in work * fix * cleanup * fix
1 parent 5e31063 commit 1ef01dd

File tree

10 files changed

+542
-173
lines changed

10 files changed

+542
-173
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,10 +570,26 @@ impl Cluster for ClusterImpl {
570570
}
571571

572572
async fn schedule_repartition(&self, p: &IdRow<Partition>) -> Result<(), CubeError> {
573+
let in_memory_node = self.node_name_by_partition(p);
574+
let in_memory_job = self
575+
.meta_store
576+
.add_job(Job::new(
577+
RowKey::Table(TableId::Partitions, p.get_id()),
578+
JobType::Repartition,
579+
in_memory_node.to_string(),
580+
))
581+
.await?;
582+
if in_memory_job.is_some() {
583+
self.notify_job_runner(in_memory_node).await?;
584+
}
585+
573586
let chunks = self
574587
.meta_store
575588
.get_chunks_by_partition(p.get_id(), false)
576-
.await?;
589+
.await?
590+
.into_iter()
591+
.filter(|c| !c.get_row().in_memory())
592+
.collect::<Vec<_>>();
577593

578594
for chunk in chunks {
579595
let node = self.node_name_for_chunk_repartition(&chunk).await?;

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,10 @@ pub trait ConfigObj: DIService {
300300

301301
fn compaction_in_memory_chunks_count_threshold(&self) -> usize;
302302

303+
fn compaction_in_memory_chunks_ratio_threshold(&self) -> u64;
304+
305+
fn compaction_in_memory_chunks_ratio_check_threshold(&self) -> u64;
306+
303307
fn wal_split_threshold(&self) -> u64;
304308

305309
fn select_worker_pool_size(&self) -> usize;
@@ -316,6 +320,8 @@ pub trait ConfigObj: DIService {
316320

317321
fn not_used_timeout(&self) -> u64;
318322

323+
fn in_memory_not_used_timeout(&self) -> u64;
324+
319325
fn import_job_timeout(&self) -> u64;
320326

321327
fn meta_store_snapshot_interval(&self) -> u64;
@@ -374,6 +380,8 @@ pub struct ConfigObjImpl {
374380
pub compaction_in_memory_chunks_size_limit: u64,
375381
pub compaction_in_memory_chunks_total_size_limit: u64,
376382
pub compaction_in_memory_chunks_count_threshold: usize,
383+
pub compaction_in_memory_chunks_ratio_threshold: u64,
384+
pub compaction_in_memory_chunks_ratio_check_threshold: u64,
377385
pub wal_split_threshold: u64,
378386
pub data_dir: PathBuf,
379387
pub dump_dir: Option<PathBuf>,
@@ -386,6 +394,7 @@ pub struct ConfigObjImpl {
386394
pub query_timeout: u64,
387395
/// Must be set to 2*query_timeout in prod, only for overrides in tests.
388396
pub not_used_timeout: u64,
397+
pub in_memory_not_used_timeout: u64,
389398
pub import_job_timeout: u64,
390399
pub meta_store_log_upload_interval: u64,
391400
pub meta_store_snapshot_interval: u64,
@@ -449,6 +458,14 @@ impl ConfigObj for ConfigObjImpl {
449458
self.compaction_in_memory_chunks_count_threshold
450459
}
451460

461+
fn compaction_in_memory_chunks_ratio_threshold(&self) -> u64 {
462+
self.compaction_in_memory_chunks_ratio_threshold
463+
}
464+
465+
fn compaction_in_memory_chunks_ratio_check_threshold(&self) -> u64 {
466+
self.compaction_in_memory_chunks_ratio_check_threshold
467+
}
468+
452469
fn wal_split_threshold(&self) -> u64 {
453470
self.wal_split_threshold
454471
}
@@ -481,6 +498,10 @@ impl ConfigObj for ConfigObjImpl {
481498
self.not_used_timeout
482499
}
483500

501+
fn in_memory_not_used_timeout(&self) -> u64 {
502+
self.in_memory_not_used_timeout
503+
}
504+
484505
fn import_job_timeout(&self) -> u64 {
485506
self.import_job_timeout
486507
}
@@ -654,6 +675,14 @@ impl Config {
654675
"CUBESTORE_IN_MEMORY_CHUNKS_COUNT_THRESHOLD",
655676
10,
656677
),
678+
compaction_in_memory_chunks_ratio_threshold: env_parse(
679+
"CUBESTORE_IN_MEMORY_CHUNKS_RATIO_THRESHOLD",
680+
3,
681+
),
682+
compaction_in_memory_chunks_ratio_check_threshold: env_parse(
683+
"CUBESTORE_IN_MEMORY_CHUNKS_RATIO_CHECK_THRESHOLD",
684+
1000,
685+
),
657686
store_provider: {
658687
if let Ok(bucket_name) = env::var("CUBESTORE_S3_BUCKET") {
659688
FileStoreProvider::S3 {
@@ -695,6 +724,7 @@ impl Config {
695724
)),
696725
query_timeout,
697726
not_used_timeout: 2 * query_timeout,
727+
in_memory_not_used_timeout: 30,
698728
import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600),
699729
meta_store_log_upload_interval: 30,
700730
meta_store_snapshot_interval: 300,
@@ -755,6 +785,8 @@ impl Config {
755785
compaction_in_memory_chunks_size_limit: 262_144 / 4,
756786
compaction_in_memory_chunks_total_size_limit: 262_144,
757787
compaction_in_memory_chunks_count_threshold: 10,
788+
compaction_in_memory_chunks_ratio_threshold: 3,
789+
compaction_in_memory_chunks_ratio_check_threshold: 1000,
758790
store_provider: FileStoreProvider::Filesystem {
759791
remote_dir: Some(
760792
env::current_dir()
@@ -769,6 +801,7 @@ impl Config {
769801
http_bind_address: None,
770802
query_timeout,
771803
not_used_timeout: 2 * query_timeout,
804+
in_memory_not_used_timeout: 30,
772805
import_job_timeout: 600,
773806
stale_stream_timeout: 60,
774807
select_workers: Vec::new(),

rust/cubestore/cubestore/src/metastore/chunks.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ impl Chunk {
2121
in_memory,
2222
created_at: Some(Utc::now()),
2323
oldest_insert_at: Some(Utc::now()),
24+
deactivated_at: None,
2425
suffix: Some(
2526
String::from_utf8(thread_rng().sample_iter(&Alphanumeric).take(8).collect())
2627
.unwrap()
@@ -79,6 +80,7 @@ impl Chunk {
7980
pub fn deactivate(&self) -> Chunk {
8081
let mut to_update = self.clone();
8182
to_update.active = false;
83+
to_update.deactivated_at = Some(Utc::now());
8284
to_update
8385
}
8486

@@ -102,6 +104,10 @@ impl Chunk {
102104
&self.oldest_insert_at
103105
}
104106

107+
pub fn deactivated_at(&self) -> &Option<DateTime<Utc>> {
108+
&self.deactivated_at
109+
}
110+
105111
pub fn suffix(&self) -> &Option<String> {
106112
&self.suffix
107113
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,8 @@ pub struct Chunk {
693693
#[serde(default)]
694694
oldest_insert_at: Option<DateTime<Utc>>,
695695
#[serde(default)]
696+
deactivated_at: Option<DateTime<Utc>>,
697+
#[serde(default)]
696698
suffix: Option<String>,
697699
#[serde(default)]
698700
file_size: Option<u64>
@@ -1047,12 +1049,23 @@ pub trait MetaStore: DIService + Send + Sync {
10471049
include_inactive: bool,
10481050
) -> Result<Vec<IdRow<Chunk>>, CubeError>;
10491051
async fn chunk_uploaded(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError>;
1052+
async fn chunk_update_last_inserted(
1053+
&self,
1054+
chunk_ids: Vec<u64>,
1055+
last_inserted_at: Option<DateTime<Utc>>,
1056+
) -> Result<(), CubeError>;
10501057
async fn deactivate_chunk(&self, chunk_id: u64) -> Result<(), CubeError>;
1058+
async fn deactivate_chunks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError>;
10511059
async fn swap_chunks(
10521060
&self,
10531061
deactivate_ids: Vec<u64>,
10541062
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
10551063
) -> Result<(), CubeError>;
1064+
async fn swap_chunks_without_check(
1065+
&self,
1066+
deactivate_ids: Vec<u64>,
1067+
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
1068+
) -> Result<(), CubeError>;
10561069
async fn activate_wal(
10571070
&self,
10581071
wal_id_to_delete: u64,
@@ -3442,6 +3455,7 @@ impl MetaStore for RocksMetaStore {
34423455
vec![(new_chunk, Some(new_chunk_file_size))],
34433456
db,
34443457
pipe,
3458+
false,
34453459
)?;
34463460
Ok(true)
34473461
})
@@ -4120,6 +4134,27 @@ impl MetaStore for RocksMetaStore {
41204134
})
41214135
.await
41224136
}
4137+
async fn chunk_update_last_inserted(
4138+
&self,
4139+
chunk_ids: Vec<u64>,
4140+
last_inserted_at: Option<DateTime<Utc>>,
4141+
) -> Result<(), CubeError> {
4142+
self.write_operation(move |db_ref, batch_pipe| {
4143+
let table = ChunkRocksTable::new(db_ref.clone());
4144+
for chunk_id in chunk_ids {
4145+
let row = table.get_row_or_not_found(chunk_id)?;
4146+
table.update(
4147+
chunk_id,
4148+
row.get_row().set_oldest_insert_at(last_inserted_at),
4149+
row.get_row(),
4150+
batch_pipe,
4151+
)?;
4152+
}
4153+
4154+
Ok(())
4155+
})
4156+
.await
4157+
}
41234158

41244159
async fn deactivate_chunk(&self, chunk_id: u64) -> Result<(), CubeError> {
41254160
self.write_operation(move |db_ref, batch_pipe| {
@@ -4132,6 +4167,16 @@ impl MetaStore for RocksMetaStore {
41324167
})
41334168
.await
41344169
}
4170+
async fn deactivate_chunks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError> {
4171+
self.write_operation(move |db_ref, batch_pipe| {
4172+
let table = ChunkRocksTable::new(db_ref.clone());
4173+
for chunk_id in chunk_ids {
4174+
table.update_with_fn(chunk_id, |row| row.deactivate(), batch_pipe)?;
4175+
}
4176+
Ok(())
4177+
})
4178+
.await
4179+
}
41354180

41364181
async fn activate_wal(
41374182
&self,
@@ -4217,6 +4262,30 @@ impl MetaStore for RocksMetaStore {
42174262
uploaded_ids_and_sizes,
42184263
db_ref,
42194264
batch_pipe,
4265+
true,
4266+
)
4267+
})
4268+
.await
4269+
}
4270+
4271+
async fn swap_chunks_without_check(
4272+
&self,
4273+
deactivate_ids: Vec<u64>,
4274+
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
4275+
) -> Result<(), CubeError> {
4276+
if uploaded_ids_and_sizes.is_empty() {
4277+
return Err(CubeError::internal(format!(
4278+
"Can't swap chunks: {:?} to {:?} empty",
4279+
deactivate_ids, uploaded_ids_and_sizes
4280+
)));
4281+
}
4282+
self.write_operation(move |db_ref, batch_pipe| {
4283+
RocksMetaStore::swap_chunks_impl(
4284+
deactivate_ids,
4285+
uploaded_ids_and_sizes,
4286+
db_ref,
4287+
batch_pipe,
4288+
false,
42204289
)
42214290
})
42224291
.await
@@ -6288,6 +6357,7 @@ impl RocksMetaStore {
62886357
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
62896358
db_ref: DbTableRef,
62906359
batch_pipe: &mut BatchPipe,
6360+
check_rows: bool,
62916361
) -> Result<(), CubeError> {
62926362
trace!(
62936363
"Swapping chunks: deactivating ({}), activating ({})",
@@ -6340,7 +6410,7 @@ impl RocksMetaStore {
63406410
batch_pipe,
63416411
)?;
63426412
}
6343-
if deactivate_ids.len() > 0 && activated_row_count != deactivated_row_count {
6413+
if check_rows && deactivate_ids.len() > 0 && activated_row_count != deactivated_row_count {
63446414
return Err(CubeError::internal(format!(
63456415
"Deactivated row count ({}) doesn't match activated row count ({}) during swap of ({}) to ({}) chunks",
63466416
deactivated_row_count,

rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,46 @@ impl InfoSchemaTableDef for SystemChunksTableDef {
113113
))
114114
}),
115115
),
116+
(
117+
Field::new(
118+
"oldest_insert_at",
119+
DataType::Timestamp(TimeUnit::Nanosecond, None),
120+
false,
121+
),
122+
Box::new(|chunks| {
123+
Arc::new(TimestampNanosecondArray::from(
124+
chunks
125+
.iter()
126+
.map(|row| {
127+
row.get_row()
128+
.oldest_insert_at()
129+
.as_ref()
130+
.map(|t| t.timestamp_nanos())
131+
})
132+
.collect::<Vec<_>>(),
133+
))
134+
}),
135+
),
136+
(
137+
Field::new(
138+
"deactivated_at",
139+
DataType::Timestamp(TimeUnit::Nanosecond, None),
140+
false,
141+
),
142+
Box::new(|chunks| {
143+
Arc::new(TimestampNanosecondArray::from(
144+
chunks
145+
.iter()
146+
.map(|row| {
147+
row.get_row()
148+
.deactivated_at()
149+
.as_ref()
150+
.map(|t| t.timestamp_nanos())
151+
})
152+
.collect::<Vec<_>>(),
153+
))
154+
}),
155+
),
116156
(
117157
Field::new("file_size", DataType::UInt64, true),
118158
Box::new(|chunks| {

0 commit comments

Comments
 (0)