File tree Expand file tree Collapse file tree 2 files changed +18
-2
lines changed
rust/cubestore/cubestore/src Expand file tree Collapse file tree 2 files changed +18
-2
lines changed Original file line number Diff line number Diff line change 1
1
use crate :: cluster:: { pick_worker_by_ids, Cluster } ;
2
2
use crate :: config:: ConfigObj ;
3
- use crate :: metastore:: job:: { Job , JobType } ;
3
+ use crate :: metastore:: job:: { Job , JobStatus , JobType } ;
4
4
use crate :: metastore:: partition:: partition_file_name;
5
5
use crate :: metastore:: table:: Table ;
6
6
use crate :: metastore:: {
@@ -490,6 +490,21 @@ impl SchedulerImpl {
490
490
}
491
491
}
492
492
}
493
+ if let MetaStoreEvent :: UpdateJob ( _, new_job) = & event {
494
+ match new_job. get_row ( ) . job_type ( ) {
495
+ JobType :: TableImportCSV ( location) if Table :: is_stream_location ( location) => {
496
+ match new_job. get_row ( ) . status ( ) {
497
+ JobStatus :: Error ( e) if e. contains ( "Stale stream timeout" ) => {
498
+ log:: info!( "Removing stale stream job: {:?}" , new_job) ;
499
+ self . meta_store . delete_job ( new_job. get_id ( ) ) . await ?;
500
+ self . reconcile_table_imports ( ) . await ?;
501
+ }
502
+ _ => { }
503
+ }
504
+ }
505
+ _ => { }
506
+ }
507
+ }
493
508
if let MetaStoreEvent :: DeleteJob ( job) = event {
494
509
match job. get_row ( ) . job_type ( ) {
495
510
JobType :: RepartitionChunk => match job. get_row ( ) . row_reference ( ) {
Original file line number Diff line number Diff line change @@ -118,7 +118,8 @@ impl StreamingService for StreamingServiceImpl {
118
118
Duration :: from_secs ( self . config_obj . stale_stream_timeout ( ) ) ,
119
119
stream. next ( ) ,
120
120
)
121
- . await ?
121
+ . await
122
+ . map_err ( |e| CubeError :: user ( format ! ( "Stale stream timeout: {}" , e) ) ) ?
122
123
{
123
124
let rows = new_rows?;
124
125
debug ! ( "Received {} rows for {}" , rows. len( ) , location) ;
You can’t perform that action at this time.
0 commit comments