Skip to content

Commit 8cbda70

Browse files
authored
chore(storage): refine error message for stream read offset snapshot (#16964)
* chore: refine error message for stream read offset snapshot * fix
1 parent 0c3c0e1 commit 8cbda70

File tree

6 files changed

+38
-37
lines changed

6 files changed

+38
-37
lines changed

โ€Žsrc/query/sql/src/planner/binder/binder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ impl<'a> Binder {
643643
};
644644

645645
match plan.kind() {
646-
QueryKind::Query { .. } | QueryKind::Explain { .. } => {}
646+
QueryKind::Query | QueryKind::Explain => {}
647647
_ => {
648648
let meta_data_guard = self.metadata.read();
649649
let tables = meta_data_guard.tables();

โ€Žsrc/query/storages/fuse/src/operations/changes.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use databend_common_expression::BASE_BLOCK_IDS_COL_NAME;
3939
use databend_storages_common_table_meta::meta::BlockMeta;
4040
use databend_storages_common_table_meta::meta::Location;
4141
use databend_storages_common_table_meta::meta::SegmentInfo;
42+
use databend_storages_common_table_meta::meta::TableSnapshot;
4243
use databend_storages_common_table_meta::table::ChangeType;
4344
use databend_storages_common_table_meta::table::StreamMode;
4445
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING_BEGIN_VER;
@@ -222,9 +223,8 @@ impl FuseTable {
222223
let latest_segments: HashSet<&Location> =
223224
HashSet::from_iter(&latest_snapshot.segments);
224225

225-
let (base_snapshot, _) =
226-
SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator())
227-
.await?;
226+
let base_snapshot =
227+
self.changes_read_offset_snapshot(base_location).await?;
228228
let base_segments = HashSet::from_iter(&base_snapshot.segments);
229229

230230
// If the base segments are a subset of the latest segments,
@@ -349,8 +349,7 @@ impl FuseTable {
349349
};
350350

351351
let base_segments = if let Some(snapshot) = base {
352-
let (sn, _) =
353-
SnapshotsIO::read_snapshot(snapshot.to_string(), self.get_operator()).await?;
352+
let sn = self.changes_read_offset_snapshot(snapshot).await?;
354353
HashSet::from_iter(sn.segments.clone())
355354
} else {
356355
HashSet::new()
@@ -435,8 +434,7 @@ impl FuseTable {
435434
return self.table_statistics(ctx, true, None).await;
436435
};
437436

438-
let (base_snapshot, _) =
439-
SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()).await?;
437+
let base_snapshot = self.changes_read_offset_snapshot(base_location).await?;
440438
let base_summary = base_snapshot.summary.clone();
441439
let latest_summary = if let Some(snapshot) = self.read_table_snapshot().await? {
442440
snapshot.summary.clone()
@@ -499,6 +497,19 @@ impl FuseTable {
499497
}
500498
}
501499
}
500+
501+
pub async fn changes_read_offset_snapshot(
502+
&self,
503+
base_location: &String,
504+
) -> Result<Arc<TableSnapshot>> {
505+
match SnapshotsIO::read_snapshot(base_location.to_string(), self.get_operator()).await {
506+
Ok((base_snapshot, _)) => Ok(base_snapshot),
507+
Err(_) => Err(ErrorCode::IllegalStream(format!(
508+
"Failed to read the offset snapshot: {:?}, maybe purged",
509+
base_location
510+
))),
511+
}
512+
}
502513
}
503514

504515
fn replace_push_downs(

โ€Žsrc/query/storages/fuse/src/operations/commit.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -480,22 +480,8 @@ impl FuseTable {
480480
}
481481
}
482482

483-
#[inline]
484-
pub fn is_error_recoverable(e: &ErrorCode, is_table_transient: bool) -> bool {
485-
let code = e.code();
486-
code == ErrorCode::TABLE_VERSION_MISMATCHED
487-
|| (is_table_transient && code == ErrorCode::STORAGE_NOT_FOUND)
488-
}
489-
490-
#[inline]
491-
pub fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool {
492-
// currently, the only error that we know, which indicates there are no side effects
493-
// is TABLE_VERSION_MISMATCHED
494-
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
495-
}
496-
497483
// check if there are any fuse table legacy options
498-
pub fn remove_legacy_options(table_options: &mut BTreeMap<String, String>) {
484+
fn remove_legacy_options(table_options: &mut BTreeMap<String, String>) {
499485
table_options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
500486
}
501487
}

โ€Žsrc/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,21 @@ where F: SnapshotGenerator + Send + 'static
138138
}
139139

140140
fn is_error_recoverable(&self, e: &ErrorCode) -> bool {
141+
let code = e.code();
141142
// When prev_snapshot_id is some, means it is an alter table column modification or truncate.
142143
// In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted.
143-
if self.prev_snapshot_id.is_some() && e.code() == ErrorCode::TABLE_VERSION_MISMATCHED {
144+
if self.prev_snapshot_id.is_some() && code == ErrorCode::TABLE_VERSION_MISMATCHED {
144145
return false;
145146
}
146-
FuseTable::is_error_recoverable(e, self.purge)
147+
148+
code == ErrorCode::TABLE_VERSION_MISMATCHED
149+
|| (self.purge && code == ErrorCode::STORAGE_NOT_FOUND)
150+
}
151+
152+
fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool {
153+
// currently, the only error that we know, which indicates there are no side effects
154+
// is TABLE_VERSION_MISMATCHED
155+
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
147156
}
148157

149158
fn read_meta(&mut self) -> Result<Event> {
@@ -469,7 +478,7 @@ where F: SnapshotGenerator + Send + 'static
469478
None => {
470479
// Commit not fulfilled. try to abort the operations.
471480
// if it is safe to do so.
472-
if FuseTable::no_side_effects_in_meta_store(&e) {
481+
if Self::no_side_effects_in_meta_store(&e) {
473482
// if we are sure that table state inside metastore has not been
474483
// modified by this operation, abort this operation.
475484
self.state = State::Abort(e);

โ€Žsrc/query/storages/stream/src/stream_table.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use databend_common_pipeline_core::Pipeline;
4242
use databend_common_sql::binder::STREAM_COLUMN_FACTORY;
4343
use databend_common_storages_fuse::io::MetaReaders;
4444
use databend_common_storages_fuse::io::SnapshotHistoryReader;
45-
use databend_common_storages_fuse::io::SnapshotsIO;
4645
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
4746
use databend_common_storages_fuse::FuseTable;
4847
use databend_storages_common_table_meta::table::ChangeType;
@@ -173,8 +172,7 @@ impl StreamTable {
173172
fuse_table.check_changes_valid(source_desc, self.offset()?)?;
174173

175174
let (base_row_count, base_timsestamp) = if let Some(base_loc) = self.snapshot_loc() {
176-
let (base, _) =
177-
SnapshotsIO::read_snapshot(base_loc.to_string(), fuse_table.get_operator()).await?;
175+
let base = fuse_table.changes_read_offset_snapshot(&base_loc).await?;
178176
(base.summary.row_count, base.timestamp)
179177
} else {
180178
(0, None)

โ€Žsrc/query/storages/system/src/streams_table.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use databend_common_meta_app::principal::OwnershipObject;
3939
use databend_common_meta_app::schema::TableIdent;
4040
use databend_common_meta_app::schema::TableInfo;
4141
use databend_common_meta_app::schema::TableMeta;
42-
use databend_common_storages_fuse::io::SnapshotsIO;
4342
use databend_common_storages_fuse::operations::acquire_task_permit;
4443
use databend_common_storages_fuse::FuseTable;
4544
use databend_common_storages_stream::stream_table::StreamTable;
@@ -250,13 +249,11 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
250249
let fuse_table =
251250
FuseTable::try_from_table(source.as_ref()).unwrap();
252251
if let Some(location) = stream_table.snapshot_loc() {
253-
reason = SnapshotsIO::read_snapshot(
254-
location,
255-
fuse_table.get_operator(),
256-
)
257-
.await
258-
.err()
259-
.map_or("".to_string(), |e| e.display_text());
252+
reason = fuse_table
253+
.changes_read_offset_snapshot(&location)
254+
.await
255+
.err()
256+
.map_or("".to_string(), |e| e.display_text());
260257
}
261258
}
262259
Err(e) => {

0 commit comments

Comments
ย (0)