File tree Expand file tree Collapse file tree 2 files changed +19
-0
lines changed Expand file tree Collapse file tree 2 files changed +19
-0
lines changed Original file line number Diff line number Diff line change @@ -473,6 +473,7 @@ pub struct TaskContext {
473
473
pub runtime_kind : RuntimeKind ,
474
474
/// Event Sender for Timer Wheel Core.
475
475
pub ( crate ) timer_event_sender : Option < TimerEventSender > ,
476
+ pub ( crate ) could_send_finish_event : Option < AsyncReceiver < ( ) > > ,
476
477
}
477
478
478
479
impl TaskContext {
@@ -496,6 +497,15 @@ impl TaskContext {
496
497
self
497
498
}
498
499
500
+ #[ inline( always) ]
501
+ pub ( crate ) fn could_send_finish_event (
502
+ & mut self ,
503
+ could_send_finish_event : AsyncReceiver < ( ) > ,
504
+ ) -> & mut Self {
505
+ self . could_send_finish_event = Some ( could_send_finish_event) ;
506
+ self
507
+ }
508
+
499
509
#[ inline( always) ]
500
510
/// Get hook functions that may be used in the future.
501
511
pub fn then_fn ( & mut self , then_fn : fn ( ) ) -> & mut Self {
@@ -512,6 +522,10 @@ impl TaskContext {
512
522
/// Send a task-Finish signal to EventHandle.
513
523
pub async fn finish_task ( self , finish_output : Option < FinishOutput > ) {
514
524
if let Some ( timer_event_sender) = self . timer_event_sender {
525
+ // Wait for the TimerEvent::AppendTaskHandle event to be sent.
526
+ if let Some ( ref wait) = self . could_send_finish_event {
527
+ let _ = wait. recv ( ) . await ;
528
+ }
515
529
timer_event_sender
516
530
. send ( TimerEvent :: FinishTask ( FinishTaskBody {
517
531
task_id : self . task_id ,
Original file line number Diff line number Diff line change @@ -323,11 +323,14 @@ impl Timer {
323
323
}
324
324
}
325
325
326
+ let ( s, r) = channel:: bounded :: < ( ) > ( 1 ) ;
327
+
326
328
let mut task_context = TaskContext :: default ( ) ;
327
329
task_context
328
330
. task_id ( task_id)
329
331
. record_id ( record_id)
330
332
. timer_event_sender ( self . timer_event_sender . clone ( ) )
333
+ . could_send_finish_event ( r)
331
334
. runtime_kind ( self . shared_header . runtime_instance . kind ) ;
332
335
333
336
let task_handler_box = self . routine_exec ( & * ( task. routine . 0 ) , task_context) ;
@@ -341,6 +344,8 @@ impl Timer {
341
344
. spawn ( task_handler_box) ;
342
345
343
346
self . send_timer_event ( task_id, tmp_task_handler_box) . await ;
347
+ // The TimerEvent::AppendTaskHandle event should always be sent before The TimerEvent::FinishTask event.
348
+ let _ = s. try_send ( ( ) ) ;
344
349
345
350
let task_valid = task. down_count_and_set_vaild ( ) ;
346
351
if !task_valid {
You can’t perform that action at this time.
0 commit comments