Skip to content

Commit 7bc9243

Browse files
authored
fix: transient table does not work as expected inside explicit transaction (#18215)
* fix: transient table not work as expected inside explicit txn * refine code comments * refine comments
1 parent 208d264 commit 7bc9243

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+678
-196
lines changed

src/query/catalog/src/table_context.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,6 @@ pub trait TableContext: Send + Sync {
360360
previous_snapshot: Option<Arc<TableSnapshot>>,
361361
) -> Result<TableMetaTimestamps>;
362362

363-
fn clear_table_meta_timestamps_cache(&self);
364-
365363
fn get_read_block_thresholds(&self) -> BlockThresholds;
366364
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);
367365

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ async fn compact_table(
165165
&compact_target.table,
166166
)?;
167167

168+
ctx.clear_table_meta_timestamps_cache();
169+
168170
{
169171
// do compact.
170172
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {

src/query/service/src/interpreters/hook/refresh_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
8383
.await?;
8484
let table_id = table.get_id();
8585

86+
ctx.clear_table_meta_timestamps_cache();
87+
8688
let mut plans = Vec::new();
8789

8890
// Generate sync aggregating indexes.

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use databend_common_sql::executor::PhysicalPlan;
3737
use databend_common_storage::StageFileInfo;
3838
use databend_common_storages_fuse::FuseTable;
3939
use databend_common_storages_stage::StageTable;
40+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
4041
use log::debug;
4142
use log::info;
4243

@@ -100,21 +101,8 @@ impl CopyIntoTableInterpreter {
100101
&self,
101102
table_info: TableInfo,
102103
plan: &CopyIntoTablePlan,
104+
table_meta_timestamps: TableMetaTimestamps,
103105
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
104-
let to_table = self
105-
.ctx
106-
.get_table(
107-
plan.catalog_info.catalog_name(),
108-
&plan.database_name,
109-
&plan.table_name,
110-
)
111-
.await?;
112-
let snapshot = FuseTable::try_from_table(to_table.as_ref())?
113-
.read_table_snapshot()
114-
.await?;
115-
let table_meta_timestamps = self
116-
.ctx
117-
.get_table_meta_timestamps(to_table.as_ref(), snapshot)?;
118106
let mut update_stream_meta_reqs = vec![];
119107
let (source, project_columns) = if let Some(ref query) = plan.query {
120108
let query = if plan.enable_distributed {
@@ -244,6 +232,7 @@ impl CopyIntoTableInterpreter {
244232
update_stream_meta: Vec<UpdateStreamMetaReq>,
245233
deduplicated_label: Option<String>,
246234
path_prefix: Option<String>,
235+
table_meta_timestamps: TableMetaTimestamps,
247236
) -> Result<()> {
248237
let ctx = self.ctx.clone();
249238
let to_table = ctx
@@ -264,11 +253,6 @@ impl CopyIntoTableInterpreter {
264253
path_prefix,
265254
)?;
266255

267-
let fuse_table = FuseTable::try_from_table(to_table.as_ref())?;
268-
let table_meta_timestamps = ctx.get_table_meta_timestamps(
269-
to_table.as_ref(),
270-
fuse_table.read_table_snapshot().await?,
271-
)?;
272256
to_table.commit_insertion(
273257
ctx.clone(),
274258
main_pipeline,
@@ -379,9 +363,21 @@ impl Interpreter for CopyIntoTableInterpreter {
379363
return self.on_no_files_to_copy().await;
380364
}
381365

366+
let snapshot = FuseTable::try_from_table(to_table.as_ref())?
367+
.read_table_snapshot()
368+
.await?;
369+
let table_meta_timestamps = self
370+
.ctx
371+
.get_table_meta_timestamps(to_table.as_ref(), snapshot)?;
372+
382373
let (physical_plan, update_stream_meta) = self
383-
.build_physical_plan(to_table.get_table_info().clone(), &self.plan)
374+
.build_physical_plan(
375+
to_table.get_table_info().clone(),
376+
&self.plan,
377+
table_meta_timestamps,
378+
)
384379
.await?;
380+
385381
let mut build_res =
386382
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
387383

@@ -405,6 +401,7 @@ impl Interpreter for CopyIntoTableInterpreter {
405401
update_stream_meta,
406402
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
407403
self.plan.path_prefix.clone(),
404+
table_meta_timestamps,
408405
)
409406
.await?;
410407
}

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use chrono::Duration;
1718
use databend_common_catalog::lock::LockTableOption;
1819
use databend_common_catalog::table::TableExt;
1920
use databend_common_exception::ErrorCode;
@@ -35,6 +36,7 @@ use databend_common_sql::plans::InsertValue;
3536
use databend_common_sql::plans::Plan;
3637
use databend_common_sql::NameResolutionContext;
3738
use databend_common_storages_stage::build_streaming_load_pipeline;
39+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
3840
use log::info;
3941

4042
use crate::interpreters::common::check_deduplicate_label;
@@ -117,7 +119,9 @@ impl Interpreter for InsertInterpreter {
117119
self.ctx
118120
.get_table_meta_timestamps(table.as_ref(), snapshot)?
119121
} else {
120-
Default::default()
122+
// For non-fuse table, the table meta timestamps does not matter,
123+
// just passes a placeholder value here
124+
TableMetaTimestamps::new(None, Duration::hours(1))
121125
};
122126

123127
let mut build_res = PipelineBuildResult::create();

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use databend_common_sql::ScalarBinder;
4848
use databend_common_storage::StageFileInfo;
4949
use databend_common_storages_factory::Table;
5050
use databend_common_storages_fuse::FuseTable;
51+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
5152
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
5253
use databend_storages_common_table_meta::table::ClusterType;
5354
use parking_lot::RwLock;
@@ -198,6 +199,7 @@ impl ReplaceInterpreter {
198199
&self.plan.source,
199200
self.plan.schema(),
200201
&mut purge_info,
202+
table_meta_timestamps,
201203
)
202204
.await?;
203205
if let Some(s) = &select_ctx {
@@ -394,6 +396,7 @@ impl ReplaceInterpreter {
394396
source: &'a InsertInputSource,
395397
schema: DataSchemaRef,
396398
purge_info: &mut Option<(Vec<StageFileInfo>, StageInfo, CopyIntoTableOptions)>,
399+
table_meta_timestamps: TableMetaTimestamps,
397400
) -> Result<ReplaceSourceCtx> {
398401
match source {
399402
InsertInputSource::Values(source) => self
@@ -413,7 +416,7 @@ impl ReplaceInterpreter {
413416
let interpreter =
414417
CopyIntoTableInterpreter::try_create(ctx.clone(), *copy_plan.clone())?;
415418
let (physical_plan, _) = interpreter
416-
.build_physical_plan(table_info, &copy_plan)
419+
.build_physical_plan(table_info, &copy_plan, table_meta_timestamps)
417420
.await?;
418421

419422
// TODO optimization: if copy_plan.stage_table_info.files_to_copy is None, there should be a short-cut plan

src/query/service/src/interpreters/interpreter_txn_commit.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,23 @@
1515
use std::sync::Arc;
1616
use std::time::Instant;
1717

18+
use databend_common_base::base::GlobalInstance;
1819
use databend_common_exception::Result;
20+
use databend_common_license::license::Feature::Vacuum;
21+
use databend_common_license::license_manager::LicenseManagerSwitch;
1922
use databend_common_meta_app::principal::StageInfo;
2023
use databend_common_metrics::storage::metrics_inc_copy_purge_files_cost_milliseconds;
2124
use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter;
2225
use databend_common_storage::init_stage_operator;
2326
use databend_common_storages_fuse::commit_with_backoff;
27+
use databend_common_storages_fuse::operations::vacuum_tables_from_info;
2428
use databend_common_storages_fuse::TableContext;
29+
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;
2530
use databend_storages_common_io::Files;
2631
use databend_storages_common_session::TxnManagerRef;
2732
use log::error;
2833
use log::info;
34+
use log::warn;
2935

3036
use crate::interpreters::Interpreter;
3137
use crate::pipelines::PipelineBuildResult;
@@ -83,6 +89,28 @@ pub async fn execute_commit_statement(ctx: Arc<dyn TableContext>) -> Result<()>
8389
for (stage_info, files) in need_purge_files {
8490
try_purge_files(ctx.clone(), &stage_info, &files).await;
8591
}
92+
93+
let tables_need_purge = { ctx.txn_mgr().lock().table_need_purge() };
94+
95+
if !tables_need_purge.is_empty() {
96+
if LicenseManagerSwitch::instance()
97+
.check_enterprise_enabled(ctx.get_license_key(), Vacuum)
98+
.is_ok()
99+
{
100+
let handler: Arc<VacuumHandlerWrapper> = GlobalInstance::get();
101+
let num_tables = tables_need_purge.len();
102+
info!("Vacuuming {num_tables} tables after transaction commit");
103+
if let Err(e) =
104+
vacuum_tables_from_info(tables_need_purge, ctx.clone(), handler).await
105+
{
106+
warn!( "Failed to vacuum tables after transaction commit (best-effort operation): {e}");
107+
} else {
108+
info!( "{num_tables} tables vacuumed after transaction commit in a best-effort manner" );
109+
}
110+
} else {
111+
warn!("EE feature is not enabled, vacuum after transaction commit is skipped");
112+
}
113+
}
86114
}
87115
Ok(())
88116
}

src/query/service/src/pipelines/builders/builder_copy_into_location.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use chrono::Duration;
1516
use databend_common_catalog::table_context::TableContext;
1617
use databend_common_exception::Result;
1718
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
1819
use databend_common_storages_stage::StageSinkTable;
20+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
1921

2022
use crate::pipelines::PipelineBuilder;
2123

@@ -32,7 +34,11 @@ impl PipelineBuilder {
3234
false,
3335
)?;
3436

37+
// The stage table that copying into
3538
let to_table = StageSinkTable::create(copy.info.clone(), copy.input_table_schema.clone())?;
39+
40+
// StageSinkTable needs not to hold the table meta timestamps invariants, just pass a dummy one
41+
let dummy_table_meta_timestamps = TableMetaTimestamps::new(None, Duration::hours(1));
3642
PipelineBuilder::build_append2table_with_commit_pipeline(
3743
self.ctx.clone(),
3844
&mut self.main_pipeline,
@@ -42,7 +48,7 @@ impl PipelineBuilder {
4248
vec![],
4349
false,
4450
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
45-
Default::default(),
51+
dummy_table_meta_timestamps,
4652
)
4753
}
4854
}

0 commit comments

Comments
 (0)