Skip to content

kernel: work: add k_work_queue_run() #90746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion include/zephyr/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -3635,6 +3635,18 @@ void k_work_queue_start(struct k_work_q *queue,
k_thread_stack_t *stack, size_t stack_size,
int prio, const struct k_work_queue_config *cfg);

/** @brief Run work queue using calling thread
*
* This will run the work queue forever unless stopped by @ref k_work_queue_stop.
*
* @param queue the queue to run
*
* @param cfg optional additional configuration parameters. Pass @c
* NULL if not required, to use the defaults documented in
* k_work_queue_config.
*/
void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg);

/** @brief Access the thread that animates a work queue.
*
* This is necessary to grant a work queue thread access to things the work
Expand Down Expand Up @@ -4214,6 +4226,11 @@ struct k_work_q {
/* The thread that animates the work. */
struct k_thread thread;

/* The thread ID that animates the work. This may be an external thread
* if k_work_queue_run() is used.
*/
k_tid_t thread_id;

/* All the following fields must be accessed only while the
* work module spinlock is held.
*/
Expand Down Expand Up @@ -4264,7 +4281,7 @@ static inline k_ticks_t k_work_delayable_remaining_get(

static inline k_tid_t k_work_queue_thread_get(struct k_work_q *queue)
{
return &queue->thread;
return queue->thread_id;
}

/** @} */
Expand Down
28 changes: 26 additions & 2 deletions kernel/work.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ static inline int queue_submit_locked(struct k_work_q *queue,
}

int ret;
bool chained = (_current == &queue->thread) && !k_is_in_isr();
bool chained = (_current == queue->thread_id) && !k_is_in_isr();
bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);

Expand Down Expand Up @@ -722,6 +722,28 @@ void k_work_queue_init(struct k_work_q *queue)
SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
}

void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg)
{
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));

uint32_t flags = K_WORK_QUEUE_STARTED;

if ((cfg != NULL) && cfg->no_yield) {
flags |= K_WORK_QUEUE_NO_YIELD;
}

if ((cfg != NULL) && (cfg->name != NULL)) {
k_thread_name_set(_current, cfg->name);
}

sys_slist_init(&queue->pending);
z_waitq_init(&queue->notifyq);
z_waitq_init(&queue->drainq);
queue->thread_id = _current;
flags_set(&queue->flags, flags);
work_queue_main(queue, NULL, NULL);
}

void k_work_queue_start(struct k_work_q *queue,
k_thread_stack_t *stack,
size_t stack_size,
Expand All @@ -731,6 +753,7 @@ void k_work_queue_start(struct k_work_q *queue,
__ASSERT_NO_MSG(queue);
__ASSERT_NO_MSG(stack);
__ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));

uint32_t flags = K_WORK_QUEUE_STARTED;

SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
Expand Down Expand Up @@ -762,6 +785,7 @@ void k_work_queue_start(struct k_work_q *queue,
}

k_thread_start(&queue->thread);
queue->thread_id = &queue->thread;

SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
}
Expand Down Expand Up @@ -841,7 +865,7 @@ int k_work_queue_stop(struct k_work_q *queue, k_timeout_t timeout)
notify_queue_locked(queue);
k_spin_unlock(&lock, key);
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work_queue, stop, queue, timeout);
if (k_thread_join(&queue->thread, timeout)) {
if (k_thread_join(queue->thread_id, timeout)) {
key = k_spin_lock(&lock);
flag_clear(&queue->flags, K_WORK_QUEUE_STOP_BIT);
k_spin_unlock(&lock, key);
Expand Down
36 changes: 28 additions & 8 deletions tests/kernel/workq/work/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ static inline int coophi_counter(void)
}

static K_THREAD_STACK_DEFINE(cooplo_stack, STACK_SIZE);
static struct k_thread cooplo_thread;
static struct k_work_q cooplo_queue;
static atomic_t cooplo_ctr;
static inline int cooplo_counter(void)
Expand Down Expand Up @@ -141,13 +142,13 @@ static inline void reset_counters(void)
static void counter_handler(struct k_work *work)
{
last_handle_ms = k_uptime_get_32();
if (k_current_get() == &coophi_queue.thread) {
if (k_current_get() == coophi_queue.thread_id) {
atomic_inc(&coophi_ctr);
} else if (k_current_get() == &k_sys_work_q.thread) {
} else if (k_current_get() == k_sys_work_q.thread_id) {
atomic_inc(&system_ctr);
} else if (k_current_get() == &cooplo_queue.thread) {
} else if (k_current_get() == cooplo_queue.thread_id) {
atomic_inc(&cooplo_ctr);
} else if (k_current_get() == &preempt_queue.thread) {
} else if (k_current_get() == preempt_queue.thread_id) {
atomic_inc(&preempt_ctr);
}
if (atomic_dec(&resubmits_left) > 0) {
Expand Down Expand Up @@ -221,6 +222,21 @@ ZTEST(work, test_unstarted)
zassert_equal(rc, -ENODEV);
}

static void cooplo_main(void *workq_ptr, void *p2, void *p3)
{
ARG_UNUSED(p2);
ARG_UNUSED(p3);

struct k_work_q *queue = (struct k_work_q *)workq_ptr;

struct k_work_queue_config cfg = {
.name = "wq.cooplo",
.no_yield = true,
};

k_work_queue_run(queue, &cfg);
}

static void test_queue_start(void)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now not testing k_work_queue_start as the test name says? Maybe you should be adding a new test for k_work_queue_run instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I'll add a separate dedicated test for k_work_queue_run instead of mixing them together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... that function is actually doing the setup for the other tests that then use the various queues for their tests. I have added a dedicated run/stop test (along with a small fix) to satisfy the request from @pdgendt to support k_work_queue_stop() properly. I'm happy to adjust this somehow as well, if you really want.

{
struct k_work_queue_config cfg = {
Expand Down Expand Up @@ -261,10 +277,14 @@ static void test_queue_start(void)
zassert_equal(coophi_queue.flags,
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);

cfg.name = "wq.cooplo";
cfg.no_yield = true;
k_work_queue_start(&cooplo_queue, cooplo_stack, STACK_SIZE,
COOPLO_PRIORITY, &cfg);
(void)k_thread_create(&cooplo_thread, cooplo_stack, STACK_SIZE, cooplo_main, &cooplo_queue,
NULL, NULL, COOPLO_PRIORITY, 0, K_FOREVER);

k_thread_start(&cooplo_thread);

/* Be sure the cooplo_thread has a chance to start running */
k_msleep(1);

zassert_equal(cooplo_queue.flags,
K_WORK_QUEUE_STARTED | K_WORK_QUEUE_NO_YIELD, NULL);
}
Expand Down
65 changes: 64 additions & 1 deletion tests/kernel/workq/work_queue/src/start_stop.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static void work_handler(struct k_work *work)
k_msleep(CONFIG_TEST_WORK_ITEM_WAIT_MS);
}

ZTEST(workqueue_api, test_k_work_queue_stop)
ZTEST(workqueue_api, test_k_work_queue_start_stop)
{
size_t i;
struct k_work work;
Expand Down Expand Up @@ -56,4 +56,67 @@ ZTEST(workqueue_api, test_k_work_queue_stop)
"Succeeded to submit work item to non-initialized work queue");
}

#define STACK_SIZE (1024 + CONFIG_TEST_EXTRA_STACK_SIZE)

static K_THREAD_STACK_DEFINE(run_stack, STACK_SIZE);

static void run_q_main(void *workq_ptr, void *sem_ptr, void *p3)
{
ARG_UNUSED(p3);

struct k_work_q *queue = (struct k_work_q *)workq_ptr;
struct k_sem *sem = (struct k_sem *)sem_ptr;

struct k_work_queue_config cfg = {
.name = "wq.run_q",
.no_yield = true,
};

k_work_queue_run(queue, &cfg);

k_sem_give(sem);
}

ZTEST(workqueue_api, test_k_work_queue_run_stop)
{
int rc;
size_t i;
struct k_thread thread;
struct k_work work;
struct k_work_q work_q = {0};
struct k_work works[NUM_TEST_ITEMS];
struct k_sem ret_sem;

k_sem_init(&ret_sem, 0, 1);

(void)k_thread_create(&thread, run_stack, STACK_SIZE, run_q_main, &work_q, &ret_sem, NULL,
K_PRIO_COOP(3), 0, K_FOREVER);

k_thread_start(&thread);

k_sleep(K_MSEC(CHECK_WAIT));

for (i = 0; i < NUM_TEST_ITEMS; i++) {
k_work_init(&works[i], work_handler);
zassert_equal(k_work_submit_to_queue(&work_q, &works[i]), 1,
"Failed to submit work item");
}

/* Wait for the work item to complete */
k_sleep(K_MSEC(CHECK_WAIT));

zassert_equal(k_work_queue_stop(&work_q, K_FOREVER), -EBUSY,
"Succeeded to stop work queue while it is running & not plugged");
zassert_true(k_work_queue_drain(&work_q, true) >= 0, "Failed to drain & plug work queue");
zassert_ok(k_work_queue_stop(&work_q, K_FOREVER), "Failed to stop work queue");

k_work_init(&work, work_handler);
zassert_equal(k_work_submit_to_queue(&work_q, &work), -ENODEV,
"Succeeded to submit work item to non-initialized work queue");

/* Take the semaphore the other thread released once done running the queue */
rc = k_sem_take(&ret_sem, K_MSEC(1));
zassert_equal(rc, 0);
}

ZTEST_SUITE(workqueue_api, NULL, NULL, NULL, NULL, NULL);
Loading