Skip to content

Commit a10bd38

Browse files
authored
fix: stream source can't get the same tablemeta (#14935)
* fix: stream can't get the same source tablemeta * add stateful test
1 parent e014eeb commit a10bd38

File tree

10 files changed

+150
-64
lines changed

10 files changed

+150
-64
lines changed

src/query/catalog/src/catalog/interface.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,15 +338,13 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
338338
unimplemented!()
339339
}
340340

341-
async fn stream_source_table(
342-
&self,
343-
_stream_desc: &str,
344-
_tenant: &str,
345-
_db_name: &str,
346-
_source_table_name: &str,
347-
) -> Result<Arc<dyn Table>> {
341+
fn get_stream_source_table(&self, _stream_desc: &str) -> Result<Option<Arc<dyn Table>>> {
348342
Err(ErrorCode::Unimplemented(
349-
"'stream_source_table' not implemented",
343+
"'get_stream_source_table' not implemented",
350344
))
351345
}
346+
347+
fn cache_stream_source_table(&self, _stream: TableInfo, _source: TableInfo) {
348+
unimplemented!()
349+
}
352350
}

src/query/catalog/src/catalog/session_catalog.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,9 @@ impl Catalog for SessionCatalog {
289289
} else {
290290
let table = self.inner.get_table(tenant, db_name, table_name).await?;
291291
if table.engine() == "STREAM" {
292-
let source_table = table
293-
.stream_source_table(Arc::new(self.clone()))
294-
.await?
295-
.get_table_info()
296-
.clone();
297292
self.txn_mgr
298293
.lock()
299-
.add_stream_table(table.get_table_info().clone(), source_table);
294+
.upsert_table_desc_to_id(table.get_table_info().clone());
300295
}
301296
Ok(table)
302297
}
@@ -467,26 +462,25 @@ impl Catalog for SessionCatalog {
467462
self.inner.get_table_engines()
468463
}
469464

470-
async fn stream_source_table(
471-
&self,
472-
stream_desc: &str,
473-
tenant: &str,
474-
db_name: &str,
475-
source_table_name: &str,
476-
) -> Result<Arc<dyn Table>> {
465+
// Get stream source table from buffer by stream desc.
466+
fn get_stream_source_table(&self, stream_desc: &str) -> Result<Option<Arc<dyn Table>>> {
477467
let is_active = self.txn_mgr.lock().is_active();
478468
if is_active {
479-
let maybe_table = self
480-
.txn_mgr
469+
self.txn_mgr
481470
.lock()
482471
.get_stream_table_source(stream_desc)
483-
.map(|table_info| self.get_table_by_info(&table_info));
484-
if let Some(t) = maybe_table {
485-
return t;
486-
}
487-
self.get_table(tenant, db_name, source_table_name).await
472+
.map(|table_info| self.get_table_by_info(&table_info))
473+
.transpose()
488474
} else {
489-
self.get_table(tenant, db_name, source_table_name).await
475+
Ok(None)
476+
}
477+
}
478+
479+
// Cache stream source table to buffer.
480+
fn cache_stream_source_table(&self, stream: TableInfo, source: TableInfo) {
481+
let is_active = self.txn_mgr.lock().is_active();
482+
if is_active {
483+
self.txn_mgr.lock().upsert_stream_table(stream, source);
490484
}
491485
}
492486
}

src/query/catalog/src/table.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
3939
use databend_storages_common_table_meta::table::ChangeType;
4040
use databend_storages_common_table_meta::table::StreamMode;
4141

42-
use crate::catalog::Catalog;
4342
use crate::lock::Lock;
4443
use crate::plan::DataSourceInfo;
4544
use crate::plan::DataSourcePlan;
@@ -402,14 +401,6 @@ pub trait Table: Sync + Send {
402401
fn is_read_only(&self) -> bool {
403402
false
404403
}
405-
406-
async fn stream_source_table(&self, _catalog: Arc<dyn Catalog>) -> Result<Arc<dyn Table>> {
407-
Err(ErrorCode::Unimplemented(format!(
408-
"The 'stream_source_table' operation is not supported for the table '{}'. Table engine: '{}'.",
409-
self.name(),
410-
self.get_table_info().engine(),
411-
)))
412-
}
413404
}
414405

415406
#[async_trait::async_trait]

src/query/service/src/interpreters/common/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub async fn build_update_stream_meta_seq(
4848
for table in tables.into_iter() {
4949
let stream = StreamTable::try_from_table(table.as_ref())?;
5050
let stream_info = stream.get_table_info();
51-
let source_table = stream.source_table(ctx.get_default_catalog()?).await?;
51+
let source_table = stream.source_table(ctx.clone()).await?;
5252
let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?;
5353

5454
let table_version = inner_fuse.get_table_info().ident.seq;

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use databend_common_storage::CopyStatus;
4646
use databend_common_storage::DataOperator;
4747
use databend_common_storage::MergeStatus;
4848
use databend_common_storage::StorageMetrics;
49+
use databend_common_storages_stream::stream_table::StreamTable;
4950
use databend_common_users::UserApiProvider;
5051
use parking_lot::Mutex;
5152
use parking_lot::RwLock;
@@ -309,15 +310,18 @@ impl QueryContextShared {
309310
let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string());
310311

311312
let already_in_cache = { self.tables_refs.lock().contains_key(&table_meta_key) };
312-
match already_in_cache {
313-
false => self.get_table_to_cache(catalog, database, table).await,
314-
true => Ok(self
313+
let res = match already_in_cache {
314+
false => self.get_table_to_cache(catalog, database, table).await?,
315+
true => self
315316
.tables_refs
316317
.lock()
317318
.get(&table_meta_key)
318319
.ok_or_else(|| ErrorCode::Internal("Logical error, it's a bug."))?
319-
.clone()),
320-
}
320+
.clone(),
321+
};
322+
323+
self.cache_stream_source_table(res.clone(), catalog).await?;
324+
Ok(res)
321325
}
322326

323327
#[async_backtrace::framed]
@@ -343,6 +347,47 @@ impl QueryContextShared {
343347
}
344348
}
345349

350+
// Cache the source table of a stream table to ensure can get the same table metadata.
351+
#[async_backtrace::framed]
352+
async fn cache_stream_source_table(&self, table: Arc<dyn Table>, catalog: &str) -> Result<()> {
353+
if table.engine() == "STREAM" {
354+
let stream = StreamTable::try_from_table(table.as_ref())?;
355+
let table_name = stream.source_table_name();
356+
let database = stream.source_table_database();
357+
let meta_key = (
358+
catalog.to_string(),
359+
database.to_string(),
360+
table_name.to_string(),
361+
);
362+
let already_in_cache = { self.tables_refs.lock().contains_key(&meta_key) };
363+
if !already_in_cache {
364+
let stream_desc = &stream.get_table_info().desc;
365+
let tenant = self.get_tenant();
366+
let catalog = self
367+
.catalog_manager
368+
.get_catalog(tenant.as_str(), catalog, self.session.session_ctx.txn_mgr())
369+
.await?;
370+
let source_table = match catalog.get_stream_source_table(stream_desc)? {
371+
Some(source_table) => source_table,
372+
None => {
373+
let source_table = catalog
374+
.get_table(tenant.as_str(), database, table_name)
375+
.await?;
376+
catalog.cache_stream_source_table(
377+
stream.get_table_info().clone(),
378+
source_table.get_table_info().clone(),
379+
);
380+
source_table
381+
}
382+
};
383+
384+
let mut tables_refs = self.tables_refs.lock();
385+
tables_refs.entry(meta_key).or_insert(source_table.clone());
386+
}
387+
}
388+
Ok(())
389+
}
390+
346391
pub fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()> {
347392
let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string());
348393
let mut tables_refs = self.tables_refs.lock();

src/query/storages/common/txn/src/manager.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,18 @@ impl TxnManager {
152152
self.txn_buffer.update_table_meta(req, table_info);
153153
}
154154

155-
pub fn add_stream_table(&mut self, stream: TableInfo, source: TableInfo) {
156-
self.txn_buffer
157-
.table_desc_to_id
158-
.insert(stream.desc.clone(), stream.ident.table_id);
155+
pub fn upsert_stream_table(&mut self, stream: TableInfo, source: TableInfo) {
159156
self.txn_buffer
160157
.stream_tables
161158
.insert(stream.ident.table_id, StreamSnapshot { stream, source });
162159
}
163160

161+
pub fn upsert_table_desc_to_id(&mut self, table: TableInfo) {
162+
self.txn_buffer
163+
.table_desc_to_id
164+
.insert(table.desc.clone(), table.ident.table_id);
165+
}
166+
164167
pub fn get_stream_table_source(&self, stream_desc: &str) -> Option<TableInfo> {
165168
self.txn_buffer
166169
.table_desc_to_id

src/query/storages/stream/src/stream_table.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::collections::HashSet;
1717
use std::sync::Arc;
1818
use std::time::Instant;
1919

20-
use databend_common_catalog::catalog::Catalog;
2120
use databend_common_catalog::catalog::StorageDescription;
2221
use databend_common_catalog::plan::block_id_from_location;
2322
use databend_common_catalog::plan::DataSourcePlan;
@@ -133,11 +132,10 @@ impl StreamTable {
133132
})
134133
}
135134

136-
pub async fn source_table(&self, catalog: Arc<dyn Catalog>) -> Result<Arc<dyn Table>> {
137-
let table = catalog
138-
.stream_source_table(
139-
&self.stream_info.desc,
140-
&self.stream_info.tenant,
135+
pub async fn source_table(&self, ctx: Arc<dyn TableContext>) -> Result<Arc<dyn Table>> {
136+
let table = ctx
137+
.get_table(
138+
self.stream_info.catalog(),
141139
&self.table_database,
142140
&self.table_name,
143141
)
@@ -192,7 +190,7 @@ impl StreamTable {
192190
push_downs: Option<PushDownInfo>,
193191
) -> Result<(PartStatistics, Partitions)> {
194192
let start = Instant::now();
195-
let table = self.source_table(ctx.get_default_catalog()?).await?;
193+
let table = self.source_table(ctx.clone()).await?;
196194
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
197195

198196
let fuse_segment_io =
@@ -288,7 +286,7 @@ impl StreamTable {
288286

289287
#[minitrace::trace]
290288
pub async fn check_stream_status(&self, ctx: Arc<dyn TableContext>) -> Result<StreamStatus> {
291-
let base_table = self.source_table(ctx.get_default_catalog()?).await?;
289+
let base_table = self.source_table(ctx).await?;
292290
let status = if base_table.get_table_info().ident.seq == self.table_version {
293291
StreamStatus::NoData
294292
} else {
@@ -343,7 +341,7 @@ impl Table for StreamTable {
343341
return Ok(StreamMode::AppendOnly);
344342
}
345343

346-
let table = self.source_table(ctx.get_default_catalog()?).await?;
344+
let table = self.source_table(ctx).await?;
347345
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
348346

349347
let latest_snapshot = fuse_table.read_table_snapshot().await?;
@@ -404,7 +402,7 @@ impl Table for StreamTable {
404402
ctx: Arc<dyn TableContext>,
405403
change_type: Option<ChangeType>,
406404
) -> Result<Option<TableStatistics>> {
407-
let table = self.source_table(ctx.get_default_catalog()?).await?;
405+
let table = self.source_table(ctx.clone()).await?;
408406
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
409407
let change_type = change_type.unwrap_or(ChangeType::Append);
410408

@@ -485,13 +483,9 @@ impl Table for StreamTable {
485483
&self,
486484
ctx: Arc<dyn TableContext>,
487485
) -> Result<Box<dyn ColumnStatisticsProvider>> {
488-
let table = self.source_table(ctx.get_default_catalog()?).await?;
486+
let table = self.source_table(ctx.clone()).await?;
489487
table.column_statistics_provider(ctx).await
490488
}
491-
492-
async fn stream_source_table(&self, catalog: Arc<dyn Catalog>) -> Result<Arc<dyn Table>> {
493-
self.source_table(catalog).await
494-
}
495489
}
496490

497491
fn replace_push_downs(

src/query/storages/system/src/streams_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl AsyncSystemTable for StreamsTable {
194194
snapshot_location.push(stream_table.snapshot_loc());
195195

196196
let mut reason = "".to_string();
197-
match stream_table.source_table(ctx.get_default_catalog()?).await {
197+
match stream_table.source_table(ctx.clone()).await {
198198
Ok(source) => {
199199
let fuse_table = FuseTable::try_from_table(source.as_ref())?;
200200
if let Some(location) = stream_table.snapshot_loc() {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
test stream success
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../shell_env.sh
5+
6+
echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT
7+
echo "create database db_stream" | $BENDSQL_CLIENT_CONNECT
8+
echo "create table db_stream.base(a int) change_tracking = true" | $BENDSQL_CLIENT_CONNECT
9+
echo "create table db_stream.rand like db_stream.base Engine = Random" | $BENDSQL_CLIENT_CONNECT
10+
echo "create stream db_stream.s on table db_stream.base" | $BENDSQL_CLIENT_CONNECT
11+
echo "create table db_stream.sink(a int)" | $BENDSQL_CLIENT_CONNECT
12+
13+
# Define function to write data into the base table.
14+
write_to_base() {
15+
for i in {1..20}; do
16+
echo "insert into db_stream.base select * from db_stream.rand limit 10" | $BENDSQL_CLIENT_CONNECT
17+
18+
if (( i % 5 == 0 )); then
19+
echo "optimize table db_stream.base compact" | $BENDSQL_CLIENT_CONNECT
20+
fi
21+
done
22+
}
23+
24+
# Define function to consume data from the stream into the sink table.
25+
consume_from_stream() {
26+
for i in {1..10}; do
27+
echo "insert into db_stream.sink select a from db_stream.s" | $BENDSQL_CLIENT_CONNECT
28+
done
29+
}
30+
31+
# Start the write and consume operations in parallel
32+
write_to_base &
33+
write_pid=$!
34+
consume_from_stream &
35+
consume_pid=$!
36+
37+
# Wait for the data writing into the base table to complete
38+
wait $write_pid
39+
# Wait for the final consume operation to complete
40+
wait $consume_pid
41+
42+
# Perform a final consume operation from the stream to ensure all data is consumed
43+
echo "insert into db_stream.sink select a from db_stream.s" | $BENDSQL_CLIENT_CONNECT
44+
45+
# Fetch the counts and sums from both base and sink tables
46+
base_count=$(echo "SELECT COUNT(*) FROM db_stream.base;" | $BENDSQL_CLIENT_CONNECT)
47+
sink_count=$(echo "SELECT COUNT(*) FROM db_stream.sink;" | $BENDSQL_CLIENT_CONNECT)
48+
base_sum=$(echo "SELECT SUM(a) FROM db_stream.base;" | $BENDSQL_CLIENT_CONNECT)
49+
sink_sum=$(echo "SELECT SUM(a) FROM db_stream.sink;" | $BENDSQL_CLIENT_CONNECT)
50+
51+
# Compare the counts and sums to verify consistency between the base and sink tables
52+
if [ "$base_count" -eq "$sink_count" ] && [ "$base_sum" -eq "$sink_sum" ]; then
53+
echo "test stream success"
54+
fi
55+
56+
echo "drop stream if exists db_stream.s" | $BENDSQL_CLIENT_CONNECT
57+
echo "drop table if exists db_stream.base all" | $BENDSQL_CLIENT_CONNECT
58+
echo "drop table if exists db_stream.rand all" | $BENDSQL_CLIENT_CONNECT
59+
echo "drop table if exists db_stream.sink all" | $BENDSQL_CLIENT_CONNECT
60+
echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)