Skip to content

Commit 3d24070

Browse files
bjarki-andreasenpetejohanson
authored andcommitted
kernel: work: add k_work_queue_run()
Add the ability to block and process a work queue by invoking `k_work_queue_run` from an existing thread. This can be particularly useful for using the main thread to process work items, in the same vein as the system work queue, but from a lower priority/preemptible thread. Signed-off-by: Bjarki Arge Andreasen <bjarki.andreasen@nordicsemi.no> Co-authored-by: Peter Johanson <peter@peterjohanson.com>
1 parent 2c69e7c commit 3d24070

File tree

4 files changed

+136
-12
lines changed

4 files changed

+136
-12
lines changed

include/zephyr/kernel.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3635,6 +3635,18 @@ void k_work_queue_start(struct k_work_q *queue,
36353635
k_thread_stack_t *stack, size_t stack_size,
36363636
int prio, const struct k_work_queue_config *cfg);
36373637

3638+
/** @brief Run work queue using calling thread
3639+
*
3640+
* This will run the work queue forever unless stopped by @ref k_work_queue_stop.
3641+
*
3642+
* @param queue the queue to run
3643+
*
3644+
* @param cfg optional additional configuration parameters. Pass @c
3645+
* NULL if not required, to use the defaults documented in
3646+
* k_work_queue_config.
3647+
*/
3648+
void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg);
3649+
36383650
/** @brief Access the thread that animates a work queue.
36393651
*
36403652
* This is necessary to grant a work queue thread access to things the work
@@ -4214,6 +4226,11 @@ struct k_work_q {
42144226
/* The thread that animates the work. */
42154227
struct k_thread thread;
42164228

4229+
/* The thread ID that animates the work. This may be an external thread
4230+
* if k_work_queue_run() is used.
4231+
*/
4232+
k_tid_t thread_id;
4233+
42174234
/* All the following fields must be accessed only while the
42184235
* work module spinlock is held.
42194236
*/
@@ -4264,7 +4281,7 @@ static inline k_ticks_t k_work_delayable_remaining_get(
42644281

42654282
static inline k_tid_t k_work_queue_thread_get(struct k_work_q *queue)
42664283
{
4267-
return &queue->thread;
4284+
return queue->thread_id;
42684285
}
42694286

42704287
/** @} */

kernel/work.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ static inline int queue_submit_locked(struct k_work_q *queue,
262262
}
263263

264264
int ret;
265-
bool chained = (_current == &queue->thread) && !k_is_in_isr();
265+
bool chained = (_current == queue->thread_id) && !k_is_in_isr();
266266
bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
267267
bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
268268

@@ -722,6 +722,28 @@ void k_work_queue_init(struct k_work_q *queue)
722722
SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
723723
}
724724

725+
void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg)
726+
{
727+
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
728+
729+
uint32_t flags = K_WORK_QUEUE_STARTED;
730+
731+
if ((cfg != NULL) && cfg->no_yield) {
732+
flags |= K_WORK_QUEUE_NO_YIELD;
733+
}
734+
735+
if ((cfg != NULL) && (cfg->name != NULL)) {
736+
k_thread_name_set(_current, cfg->name);
737+
}
738+
739+
sys_slist_init(&queue->pending);
740+
z_waitq_init(&queue->notifyq);
741+
z_waitq_init(&queue->drainq);
742+
queue->thread_id = _current;
743+
flags_set(&queue->flags, flags);
744+
work_queue_main(queue, NULL, NULL);
745+
}
746+
725747
void k_work_queue_start(struct k_work_q *queue,
726748
k_thread_stack_t *stack,
727749
size_t stack_size,
@@ -731,6 +753,7 @@ void k_work_queue_start(struct k_work_q *queue,
731753
__ASSERT_NO_MSG(queue);
732754
__ASSERT_NO_MSG(stack);
733755
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
756+
734757
uint32_t flags = K_WORK_QUEUE_STARTED;
735758

736759
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
@@ -762,6 +785,7 @@ void k_work_queue_start(struct k_work_q *queue,
762785
}
763786

764787
k_thread_start(&queue->thread);
788+
queue->thread_id = &queue->thread;
765789

766790
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
767791
}
@@ -841,7 +865,7 @@ int k_work_queue_stop(struct k_work_q *queue, k_timeout_t timeout)
841865
notify_queue_locked(queue);
842866
k_spin_unlock(&lock, key);
843867
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work_queue, stop, queue, timeout);
844-
if (k_thread_join(&queue->thread, timeout)) {
868+
if (k_thread_join(queue->thread_id, timeout)) {
845869
key = k_spin_lock(&lock);
846870
flag_clear(&queue->flags, K_WORK_QUEUE_STOP_BIT);
847871
k_spin_unlock(&lock, key);

tests/kernel/workq/work/src/main.c

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static inline int coophi_counter(void)
9595
}
9696

9797
static K_THREAD_STACK_DEFINE(cooplo_stack, STACK_SIZE);
98+
static struct k_thread cooplo_thread;
9899
static struct k_work_q cooplo_queue;
99100
static atomic_t cooplo_ctr;
100101
static inline int cooplo_counter(void)
@@ -141,13 +142,13 @@ static inline void reset_counters(void)
141142
static void counter_handler(struct k_work *work)
142143
{
143144
last_handle_ms = k_uptime_get_32();
144-
if (k_current_get() == &coophi_queue.thread) {
145+
if (k_current_get() == coophi_queue.thread_id) {
145146
atomic_inc(&coophi_ctr);
146-
} else if (k_current_get() == &k_sys_work_q.thread) {
147+
} else if (k_current_get() == k_sys_work_q.thread_id) {
147148
atomic_inc(&system_ctr);
148-
} else if (k_current_get() == &cooplo_queue.thread) {
149+
} else if (k_current_get() == cooplo_queue.thread_id) {
149150
atomic_inc(&cooplo_ctr);
150-
} else if (k_current_get() == &preempt_queue.thread) {
151+
} else if (k_current_get() == preempt_queue.thread_id) {
151152
atomic_inc(&preempt_ctr);
152153
}
153154
if (atomic_dec(&resubmits_left) > 0) {
@@ -221,6 +222,21 @@ ZTEST(work, test_unstarted)
221222
zassert_equal(rc, -ENODEV);
222223
}
223224

225+
static void cooplo_main(void *workq_ptr, void *p2, void *p3)
226+
{
227+
ARG_UNUSED(p2);
228+
ARG_UNUSED(p3);
229+
230+
struct k_work_q *queue = (struct k_work_q *)workq_ptr;
231+
232+
struct k_work_queue_config cfg = {
233+
.name = "wq.cooplo",
234+
.no_yield = true,
235+
};
236+
237+
k_work_queue_run(queue, &cfg);
238+
}
239+
224240
static void test_queue_start(void)
225241
{
226242
struct k_work_queue_config cfg = {
@@ -261,10 +277,14 @@ static void test_queue_start(void)
261277
zassert_equal(coophi_queue.flags,
262278
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);
263279

264-
cfg.name = "wq.cooplo";
265-
cfg.no_yield = true;
266-
k_work_queue_start(&cooplo_queue, cooplo_stack, STACK_SIZE,
267-
COOPLO_PRIORITY, &cfg);
280+
(void)k_thread_create(&cooplo_thread, cooplo_stack, STACK_SIZE, cooplo_main, &cooplo_queue,
281+
NULL, NULL, COOPLO_PRIORITY, 0, K_FOREVER);
282+
283+
k_thread_start(&cooplo_thread);
284+
285+
/* Be sure the cooplo_thread has a chance to start running */
286+
k_msleep(1);
287+
268288
zassert_equal(cooplo_queue.flags,
269289
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);
270290
}

tests/kernel/workq/work_queue/src/start_stop.c

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ static void work_handler(struct k_work *work)
2121
k_msleep(CONFIG_TEST_WORK_ITEM_WAIT_MS);
2222
}
2323

24-
ZTEST(workqueue_api, test_k_work_queue_stop)
24+
ZTEST(workqueue_api, test_k_work_queue_start_stop)
2525
{
2626
size_t i;
2727
struct k_work work;
@@ -56,4 +56,67 @@ ZTEST(workqueue_api, test_k_work_queue_stop)
5656
"Succeeded to submit work item to non-initialized work queue");
5757
}
5858

59+
#define STACK_SIZE (1024 + CONFIG_TEST_EXTRA_STACK_SIZE)
60+
61+
static K_THREAD_STACK_DEFINE(run_stack, STACK_SIZE);
62+
63+
static void run_q_main(void *workq_ptr, void *sem_ptr, void *p3)
64+
{
65+
ARG_UNUSED(p3);
66+
67+
struct k_work_q *queue = (struct k_work_q *)workq_ptr;
68+
struct k_sem *sem = (struct k_sem *)sem_ptr;
69+
70+
struct k_work_queue_config cfg = {
71+
.name = "wq.run_q",
72+
.no_yield = true,
73+
};
74+
75+
k_work_queue_run(queue, &cfg);
76+
77+
k_sem_give(sem);
78+
}
79+
80+
ZTEST(workqueue_api, test_k_work_queue_run_stop)
81+
{
82+
int rc;
83+
size_t i;
84+
struct k_thread thread;
85+
struct k_work work;
86+
struct k_work_q work_q = {0};
87+
struct k_work works[NUM_TEST_ITEMS];
88+
struct k_sem ret_sem;
89+
90+
k_sem_init(&ret_sem, 0, 1);
91+
92+
(void)k_thread_create(&thread, run_stack, STACK_SIZE, run_q_main, &work_q, &ret_sem, NULL,
93+
K_PRIO_COOP(3), 0, K_FOREVER);
94+
95+
k_thread_start(&thread);
96+
97+
k_sleep(K_MSEC(CHECK_WAIT));
98+
99+
for (i = 0; i < NUM_TEST_ITEMS; i++) {
100+
k_work_init(&works[i], work_handler);
101+
zassert_equal(k_work_submit_to_queue(&work_q, &works[i]), 1,
102+
"Failed to submit work item");
103+
}
104+
105+
/* Wait for the work item to complete */
106+
k_sleep(K_MSEC(CHECK_WAIT));
107+
108+
zassert_equal(k_work_queue_stop(&work_q, K_FOREVER), -EBUSY,
109+
"Succeeded to stop work queue while it is running & not plugged");
110+
zassert_true(k_work_queue_drain(&work_q, true) >= 0, "Failed to drain & plug work queue");
111+
zassert_ok(k_work_queue_stop(&work_q, K_FOREVER), "Failed to stop work queue");
112+
113+
k_work_init(&work, work_handler);
114+
zassert_equal(k_work_submit_to_queue(&work_q, &work), -ENODEV,
115+
"Succeeded to submit work item to non-initialized work queue");
116+
117+
/* Take the semaphore the other thread released once done running the queue */
118+
rc = k_sem_take(&ret_sem, K_MSEC(1));
119+
zassert_equal(rc, 0);
120+
}
121+
59122
ZTEST_SUITE(workqueue_api, NULL, NULL, NULL, NULL, NULL);

0 commit comments

Comments
 (0)