@@ -174,17 +174,19 @@ impl EventHandle {
174
174
175
175
let dispatch_result = self . event_dispatch ( event) . await ;
176
176
177
- if let Ok ( public_event) = dispatch_result
178
- . map_err ( |e| {
177
+ match dispatch_result {
178
+ Ok ( event_sync_mark) if event_sync_mark => {
179
+ if let Ok ( public_event) = public_event_result {
180
+ status_report_sender
181
+ . send ( public_event)
182
+ . await
183
+ . unwrap_or_else ( |e| error ! ( "event sync error: {}" , e) ) ;
184
+ }
185
+ }
186
+ Err ( e) => {
179
187
error ! ( "{}" , & e) ;
180
- e
181
- } )
182
- . and ( public_event_result)
183
- {
184
- status_report_sender
185
- . send ( public_event)
186
- . await
187
- . unwrap_or_else ( |e| print ! ( "{}" , e) ) ;
188
+ }
189
+ _ => { }
188
190
}
189
191
}
190
192
return ;
@@ -199,33 +201,35 @@ impl EventHandle {
199
201
}
200
202
}
201
203
202
- pub ( crate ) async fn event_dispatch ( & mut self , event : TimerEvent ) -> Result < ( ) > {
204
+ pub ( crate ) async fn event_dispatch ( & mut self , event : TimerEvent ) -> Result < bool > {
203
205
match event {
204
206
TimerEvent :: StopTimer => {
205
207
self . shared_header . shared_motivation . store ( false , Release ) ;
206
- Ok ( ( ) )
208
+ Ok ( true )
207
209
}
208
210
209
- TimerEvent :: AddTask ( task) => self
210
- . add_task ( task)
211
- . map ( |task_mark| self . record_task_mark ( task_mark) ) ,
211
+ TimerEvent :: AddTask ( task) => self . add_task ( task) . map ( |task_mark| {
212
+ self . record_task_mark ( task_mark) ;
213
+ true
214
+ } ) ,
212
215
213
216
TimerEvent :: InsertTask ( task, task_instances_chain_maintainer) => {
214
217
self . add_task ( task) . map ( |mut task_mark| {
215
218
task_mark. set_task_instances_chain_maintainer ( task_instances_chain_maintainer) ;
216
219
self . record_task_mark ( task_mark) ;
220
+ true
217
221
} )
218
222
}
219
223
220
224
TimerEvent :: UpdateTask ( task) => {
221
225
self . update_task ( task) . await ;
222
- Ok ( ( ) )
226
+ Ok ( true )
223
227
}
224
228
225
- TimerEvent :: AdvanceTask ( task_id) => self . advance_task ( task_id) . await ,
229
+ TimerEvent :: AdvanceTask ( task_id) => self . advance_task ( task_id) . await . map ( |_| true ) ,
226
230
227
231
TimerEvent :: RemoveTask ( task_id) => {
228
- let remove_result = self . remove_task ( task_id) . await ;
232
+ let remove_result = self . remove_task ( task_id) . await . map ( |_| true ) ;
229
233
230
234
self . shared_header . task_flag_map . remove ( & task_id) ;
231
235
remove_result
@@ -234,24 +238,27 @@ impl EventHandle {
234
238
self . cancel_task :: < true > ( task_id, record_id, state:: instance:: CANCELLED )
235
239
}
236
240
241
+ // FIXED:
242
+ // When the `TimeoutTask` event fails to remove the handle, Ok(()) is returned by default.
243
+ // This causes the `TimeoutTask` event to be sent to the outside world by `status_report_sender`,
244
+ // Which is a buggy behavior.
245
+
246
+ // Redesign the return value: Result<()> -> Result<bool>
247
+ // Ok(_) & Err(_) for Result, means whether the processing is successful or not.
248
+ // `bool` means whether to synchronize the event to external.
237
249
TimerEvent :: TimeoutTask ( task_id, record_id) => {
238
250
self . cancel_task :: < false > ( task_id, record_id, state:: instance:: TIMEOUT )
239
251
}
240
252
241
253
TimerEvent :: AppendTaskHandle ( task_id, delay_task_handler_box) => {
242
254
self . maintain_task_status ( task_id, delay_task_handler_box)
243
255
. await ;
244
- Ok ( ( ) )
256
+ Ok ( true )
245
257
}
246
258
247
259
TimerEvent :: FinishTask ( FinishTaskBody {
248
260
task_id, record_id, ..
249
- } ) => {
250
- //TODO: maintain a outside-task-handle , through it pass the _finish_time and final-state.
251
- // Provide a separate start time for the external, record_id time with a delay.
252
- // Or use snowflake.real_time to generate record_id , so you don't have to add a separate field.
253
- self . finish_task ( task_id, record_id)
254
- }
261
+ } ) => self . finish_task ( task_id, record_id) . map ( |_| true ) ,
255
262
}
256
263
}
257
264
@@ -399,7 +406,7 @@ impl EventHandle {
399
406
task_id : u64 ,
400
407
record_id : i64 ,
401
408
state : usize ,
402
- ) -> Result < ( ) > {
409
+ ) -> Result < bool > {
403
410
// The cancellation operation is executed first, and then the outside world is notified of the cancellation event.
404
411
// If the operation object does not exist in the middle, it should return early.
405
412
@@ -409,7 +416,7 @@ impl EventHandle {
409
416
if INITIATIVE {
410
417
quit_result?;
411
418
} else {
412
- return Ok ( ( ) ) ;
419
+ return Ok ( false ) ;
413
420
}
414
421
}
415
422
@@ -423,7 +430,7 @@ impl EventHandle {
423
430
task_mark. notify_cancel_finish ( record_id, state) ?;
424
431
}
425
432
426
- return Ok ( ( ) ) ;
433
+ return Ok ( true ) ;
427
434
}
428
435
429
436
if INITIATIVE {
@@ -434,7 +441,7 @@ impl EventHandle {
434
441
state
435
442
) )
436
443
} else {
437
- Ok ( ( ) )
444
+ Ok ( false )
438
445
}
439
446
}
440
447
0 commit comments