@@ -48,7 +48,8 @@ struct io_worker {
48
48
struct io_wqe * wqe ;
49
49
50
50
struct io_wq_work * cur_work ;
51
- spinlock_t lock ;
51
+ struct io_wq_work * next_work ;
52
+ raw_spinlock_t lock ;
52
53
53
54
struct completion ref_done ;
54
55
@@ -405,8 +406,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
405
406
* Worker will start processing some work. Move it to the busy list, if
406
407
* it's currently on the freelist
407
408
*/
408
- static void __io_worker_busy (struct io_wqe * wqe , struct io_worker * worker ,
409
- struct io_wq_work * work )
409
+ static void __io_worker_busy (struct io_wqe * wqe , struct io_worker * worker )
410
410
__must_hold (wqe - > lock )
411
411
{
412
412
if (worker -> flags & IO_WORKER_F_FREE ) {
@@ -529,9 +529,10 @@ static void io_assign_current_work(struct io_worker *worker,
529
529
cond_resched ();
530
530
}
531
531
532
- spin_lock (& worker -> lock );
532
+ raw_spin_lock (& worker -> lock );
533
533
worker -> cur_work = work ;
534
- spin_unlock (& worker -> lock );
534
+ worker -> next_work = NULL ;
535
+ raw_spin_unlock (& worker -> lock );
535
536
}
536
537
537
538
static void io_wqe_enqueue (struct io_wqe * wqe , struct io_wq_work * work );
@@ -546,7 +547,7 @@ static void io_worker_handle_work(struct io_worker *worker)
546
547
547
548
do {
548
549
struct io_wq_work * work ;
549
- get_next :
550
+
550
551
/*
551
552
* If we got some work, mark us as busy. If we didn't, but
552
553
* the list isn't empty, it means we stalled on hashed work.
@@ -555,9 +556,20 @@ static void io_worker_handle_work(struct io_worker *worker)
555
556
* clear the stalled flag.
556
557
*/
557
558
work = io_get_next_work (acct , worker );
558
- if (work )
559
- __io_worker_busy (wqe , worker , work );
560
-
559
+ if (work ) {
560
+ __io_worker_busy (wqe , worker );
561
+
562
+ /*
563
+ * Make sure cancelation can find this, even before
564
+ * it becomes the active work. That avoids a window
565
+ * where the work has been removed from our general
566
+ * work list, but isn't yet discoverable as the
567
+ * current work item for this worker.
568
+ */
569
+ raw_spin_lock (& worker -> lock );
570
+ worker -> next_work = work ;
571
+ raw_spin_unlock (& worker -> lock );
572
+ }
561
573
raw_spin_unlock (& wqe -> lock );
562
574
if (!work )
563
575
break ;
@@ -594,11 +606,6 @@ static void io_worker_handle_work(struct io_worker *worker)
594
606
spin_unlock_irq (& wq -> hash -> wait .lock );
595
607
if (wq_has_sleeper (& wq -> hash -> wait ))
596
608
wake_up (& wq -> hash -> wait );
597
- raw_spin_lock (& wqe -> lock );
598
- /* skip unnecessary unlock-lock wqe->lock */
599
- if (!work )
600
- goto get_next ;
601
- raw_spin_unlock (& wqe -> lock );
602
609
}
603
610
} while (work );
604
611
@@ -815,7 +822,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
815
822
816
823
refcount_set (& worker -> ref , 1 );
817
824
worker -> wqe = wqe ;
818
- spin_lock_init (& worker -> lock );
825
+ raw_spin_lock_init (& worker -> lock );
819
826
init_completion (& worker -> ref_done );
820
827
821
828
if (index == IO_WQ_ACCT_BOUND )
@@ -973,6 +980,19 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
973
980
work -> flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT ));
974
981
}
975
982
983
+ static bool __io_wq_worker_cancel (struct io_worker * worker ,
984
+ struct io_cb_cancel_data * match ,
985
+ struct io_wq_work * work )
986
+ {
987
+ if (work && match -> fn (work , match -> data )) {
988
+ work -> flags |= IO_WQ_WORK_CANCEL ;
989
+ set_notify_signal (worker -> task );
990
+ return true;
991
+ }
992
+
993
+ return false;
994
+ }
995
+
976
996
static bool io_wq_worker_cancel (struct io_worker * worker , void * data )
977
997
{
978
998
struct io_cb_cancel_data * match = data ;
@@ -981,13 +1001,11 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
981
1001
* Hold the lock to avoid ->cur_work going out of scope, caller
982
1002
* may dereference the passed in work.
983
1003
*/
984
- spin_lock (& worker -> lock );
985
- if (worker -> cur_work &&
986
- match -> fn (worker -> cur_work , match -> data )) {
987
- set_notify_signal (worker -> task );
1004
+ raw_spin_lock (& worker -> lock );
1005
+ if (__io_wq_worker_cancel (worker , match , worker -> cur_work ) ||
1006
+ __io_wq_worker_cancel (worker , match , worker -> next_work ))
988
1007
match -> nr_running ++ ;
989
- }
990
- spin_unlock (& worker -> lock );
1008
+ raw_spin_unlock (& worker -> lock );
991
1009
992
1010
return match -> nr_running && !match -> cancel_all ;
993
1011
}
@@ -1039,17 +1057,16 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1039
1057
{
1040
1058
int i ;
1041
1059
retry :
1042
- raw_spin_lock (& wqe -> lock );
1043
1060
for (i = 0 ; i < IO_WQ_ACCT_NR ; i ++ ) {
1044
1061
struct io_wqe_acct * acct = io_get_acct (wqe , i == 0 );
1045
1062
1046
1063
if (io_acct_cancel_pending_work (wqe , acct , match )) {
1064
+ raw_spin_lock (& wqe -> lock );
1047
1065
if (match -> cancel_all )
1048
1066
goto retry ;
1049
- return ;
1067
+ break ;
1050
1068
}
1051
1069
}
1052
- raw_spin_unlock (& wqe -> lock );
1053
1070
}
1054
1071
1055
1072
static void io_wqe_cancel_running_work (struct io_wqe * wqe ,
@@ -1074,25 +1091,27 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1074
1091
* First check pending list, if we're lucky we can just remove it
1075
1092
* from there. CANCEL_OK means that the work is returned as-new,
1076
1093
* no completion will be posted for it.
1077
- */
1078
- for_each_node (node ) {
1079
- struct io_wqe * wqe = wq -> wqes [node ];
1080
-
1081
- io_wqe_cancel_pending_work (wqe , & match );
1082
- if (match .nr_pending && !match .cancel_all )
1083
- return IO_WQ_CANCEL_OK ;
1084
- }
1085
-
1086
- /*
1087
- * Now check if a free (going busy) or busy worker has the work
1094
+ *
1095
+ * Then check if a free (going busy) or busy worker has the work
1088
1096
* currently running. If we find it there, we'll return CANCEL_RUNNING
1089
1097
* as an indication that we attempt to signal cancellation. The
1090
1098
* completion will run normally in this case.
1099
+ *
1100
+ * Do both of these while holding the wqe->lock, to ensure that
1101
+ * we'll find a work item regardless of state.
1091
1102
*/
1092
1103
for_each_node (node ) {
1093
1104
struct io_wqe * wqe = wq -> wqes [node ];
1094
1105
1106
+ raw_spin_lock (& wqe -> lock );
1107
+ io_wqe_cancel_pending_work (wqe , & match );
1108
+ if (match .nr_pending && !match .cancel_all ) {
1109
+ raw_spin_unlock (& wqe -> lock );
1110
+ return IO_WQ_CANCEL_OK ;
1111
+ }
1112
+
1095
1113
io_wqe_cancel_running_work (wqe , & match );
1114
+ raw_spin_unlock (& wqe -> lock );
1096
1115
if (match .nr_running && !match .cancel_all )
1097
1116
return IO_WQ_CANCEL_RUNNING ;
1098
1117
}
@@ -1263,7 +1282,9 @@ static void io_wq_destroy(struct io_wq *wq)
1263
1282
.fn = io_wq_work_match_all ,
1264
1283
.cancel_all = true,
1265
1284
};
1285
+ raw_spin_lock (& wqe -> lock );
1266
1286
io_wqe_cancel_pending_work (wqe , & match );
1287
+ raw_spin_unlock (& wqe -> lock );
1267
1288
free_cpumask_var (wqe -> cpu_mask );
1268
1289
kfree (wqe );
1269
1290
}
0 commit comments