@@ -18,7 +18,11 @@ use common_datablocks::DataBlock;
18
18
use common_datavalues:: prelude:: * ;
19
19
use common_exception:: Result ;
20
20
use common_storages_table_meta:: meta:: TableSnapshotLite ;
21
+ use futures:: stream:: StreamExt ;
22
+ use futures:: stream:: TryStreamExt ;
21
23
24
+ use crate :: io:: MetaReaders ;
25
+ use crate :: io:: SnapshotHistoryReader ;
22
26
use crate :: io:: SnapshotsIO ;
23
27
use crate :: io:: TableMetaLocationGenerator ;
24
28
use crate :: sessions:: TableContext ;
@@ -45,16 +49,45 @@ impl<'a> FuseSnapshot<'a> {
45
49
self . table . operator . clone ( ) ,
46
50
snapshot_version,
47
51
) ;
48
- let ( snapshots, _) = snapshots_io
49
- . read_snapshot_lites (
50
- snapshot_location,
51
- limit,
52
- false ,
53
- snapshot. and_then ( |s| s. timestamp ) ,
54
- & |_| { } ,
55
- )
56
- . await ?;
57
- return self . to_block ( & meta_location_generator, & snapshots, snapshot_version) ;
52
+ let snapshot_lite = match limit {
53
+ None => {
54
+ // Use SnapshotIO only if limitation is None.
55
+ //
56
+ // SnapshotsIO lists snapshots from object storage, if we limit the number of
57
+ // items being list , there might be the case that the snapshots returned
58
+ // can not be chained together.
59
+ let ( snapshots, _) = snapshots_io
60
+ . read_snapshot_lites (
61
+ snapshot_location,
62
+ None ,
63
+ false ,
64
+ snapshot. and_then ( |s| s. timestamp ) ,
65
+ & |_| { } ,
66
+ )
67
+ . await ?;
68
+ Ok ( snapshots)
69
+ }
70
+ Some ( l) => {
71
+ // SnapshotHistoryReader (which TableSnapshotReader impls) traverses the history
72
+ // of snapshot sequentially, by using the TableSnapshot::prev_snapshot_id, which
73
+ // guarantees the snapshot returned can be chained together
74
+ let table_snapshot_reader = MetaReaders :: table_snapshot_reader (
75
+ self . ctx . get_data_operator ( ) ?. operator ( ) ,
76
+ ) ;
77
+ table_snapshot_reader
78
+ . snapshot_history (
79
+ snapshot_location,
80
+ snapshot_version,
81
+ meta_location_generator. clone ( ) ,
82
+ )
83
+ . map_ok ( |snapshot| TableSnapshotLite :: from ( snapshot. as_ref ( ) ) )
84
+ . take ( l)
85
+ . try_collect :: < Vec < _ > > ( )
86
+ . await
87
+ }
88
+ } ?;
89
+
90
+ return self . to_block ( & meta_location_generator, & snapshot_lite, snapshot_version) ;
58
91
}
59
92
Ok ( DataBlock :: empty_with_schema ( FuseSnapshot :: schema ( ) ) )
60
93
}
0 commit comments