Skip to content

Commit 66bed1b

Browse files
kernel: work: add k_work_queue_run()
Add the ability to block and process a work queue by invoking `k_work_queue_run` from an existing thread. This can be particularly useful for using the main thread to process work items, in the same vein as the system work queue, but from a lower priority/preemptible thread. Signed-off-by: Bjarki Arge Andreasen <bjarki.andreasen@nordicsemi.no> Co-authored-by: Peter Johanson <peter@peterjohanson.com>
1 parent f47f3fe commit 66bed1b

File tree

3 files changed

+62
-9
lines changed

3 files changed

+62
-9
lines changed

include/zephyr/kernel.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3625,6 +3625,18 @@ void k_work_queue_start(struct k_work_q *queue,
36253625
k_thread_stack_t *stack, size_t stack_size,
36263626
int prio, const struct k_work_queue_config *cfg);
36273627

3628+
/** @brief Run work queue using calling thread
3629+
*
3630+
* This will run the work queue forever, the function never returns
3631+
*
3632+
* @param queue the queue to run
3633+
*
3634+
* @param cfg optional additional configuration parameters. Pass @c
3635+
* NULL if not required, to use the defaults documented in
3636+
* k_work_queue_config.
3637+
*/
3638+
void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg);
3639+
36283640
/** @brief Access the thread that animates a work queue.
36293641
*
36303642
* This is necessary to grant a work queue thread access to things the work

kernel/work.c

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ static inline int queue_submit_locked(struct k_work_q *queue,
262262
}
263263

264264
int ret;
265-
bool chained = (_current == &queue->thread) && !k_is_in_isr();
265+
bool chained = (_current == queue->thread_id) && !k_is_in_isr();
266266
bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
267267
bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
268268

@@ -722,6 +722,28 @@ void k_work_queue_init(struct k_work_q *queue)
722722
SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
723723
}
724724

725+
void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg)
726+
{
727+
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
728+
729+
uint32_t flags = K_WORK_QUEUE_STARTED;
730+
731+
if ((cfg != NULL) && cfg->no_yield) {
732+
flags |= K_WORK_QUEUE_NO_YIELD;
733+
}
734+
735+
if ((cfg != NULL) && (cfg->name != NULL)) {
736+
k_thread_name_set(_current, cfg->name);
737+
}
738+
739+
sys_slist_init(&queue->pending);
740+
z_waitq_init(&queue->notifyq);
741+
z_waitq_init(&queue->drainq);
742+
queue->thread_id = _current;
743+
flags_set(&queue->flags, flags);
744+
work_queue_main(queue, NULL, NULL);
745+
}
746+
725747
void k_work_queue_start(struct k_work_q *queue,
726748
k_thread_stack_t *stack,
727749
size_t stack_size,
@@ -731,6 +753,7 @@ void k_work_queue_start(struct k_work_q *queue,
731753
__ASSERT_NO_MSG(queue);
732754
__ASSERT_NO_MSG(stack);
733755
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
756+
734757
uint32_t flags = K_WORK_QUEUE_STARTED;
735758

736759
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
@@ -762,6 +785,7 @@ void k_work_queue_start(struct k_work_q *queue,
762785
}
763786

764787
k_thread_start(&queue->thread);
788+
queue->thread_id = &queue->thread;
765789

766790
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
767791
}

tests/kernel/workq/work/src/main.c

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static inline int coophi_counter(void)
9595
}
9696

9797
static K_THREAD_STACK_DEFINE(cooplo_stack, STACK_SIZE);
98+
static struct k_thread cooplo_thread;
9899
static struct k_work_q cooplo_queue;
99100
static atomic_t cooplo_ctr;
100101
static inline int cooplo_counter(void)
@@ -141,13 +142,13 @@ static inline void reset_counters(void)
141142
static void counter_handler(struct k_work *work)
142143
{
143144
last_handle_ms = k_uptime_get_32();
144-
if (k_current_get() == &coophi_queue.thread) {
145+
if (k_current_get() == coophi_queue.thread_id) {
145146
atomic_inc(&coophi_ctr);
146-
} else if (k_current_get() == &k_sys_work_q.thread) {
147+
} else if (k_current_get() == k_sys_work_q.thread_id) {
147148
atomic_inc(&system_ctr);
148-
} else if (k_current_get() == &cooplo_queue.thread) {
149+
} else if (k_current_get() == cooplo_queue.thread_id) {
149150
atomic_inc(&cooplo_ctr);
150-
} else if (k_current_get() == &preempt_queue.thread) {
151+
} else if (k_current_get() == preempt_queue.thread_id) {
151152
atomic_inc(&preempt_ctr);
152153
}
153154
if (atomic_dec(&resubmits_left) > 0) {
@@ -221,6 +222,21 @@ ZTEST(work, test_unstarted)
221222
zassert_equal(rc, -ENODEV);
222223
}
223224

225+
static void cooplo_main(void *workq_ptr, void *p2, void *p3)
226+
{
227+
ARG_UNUSED(p2);
228+
ARG_UNUSED(p3);
229+
230+
struct k_work_q *queue = (struct k_work_q *)workq_ptr;
231+
232+
struct k_work_queue_config cfg = {
233+
.name = "wq.cooplo",
234+
.no_yield = true,
235+
};
236+
237+
k_work_queue_run(queue, &cfg);
238+
}
239+
224240
static void test_queue_start(void)
225241
{
226242
struct k_work_queue_config cfg = {
@@ -261,10 +277,11 @@ static void test_queue_start(void)
261277
zassert_equal(coophi_queue.flags,
262278
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);
263279

264-
cfg.name = "wq.cooplo";
265-
cfg.no_yield = true;
266-
k_work_queue_start(&cooplo_queue, cooplo_stack, STACK_SIZE,
267-
COOPLO_PRIORITY, &cfg);
280+
(void)k_thread_create(&cooplo_thread, cooplo_stack, STACK_SIZE, cooplo_main, &cooplo_queue,
281+
NULL, NULL, COOPLO_PRIORITY, 0, K_FOREVER);
282+
283+
k_thread_start(&cooplo_thread);
284+
268285
zassert_equal(cooplo_queue.flags,
269286
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);
270287
}

0 commit comments

Comments
 (0)