diff --git a/include/zephyr/kernel.h b/include/zephyr/kernel.h index 614b2eab1b9b8..7fe1920c14701 100644 --- a/include/zephyr/kernel.h +++ b/include/zephyr/kernel.h @@ -4785,7 +4785,7 @@ __syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size, int k_msgq_cleanup(struct k_msgq *msgq); /** - * @brief Send a message to a message queue. + * @brief Send a message to the end of a message queue. * * This routine sends a message to message queue @a q. * @@ -4806,6 +4806,33 @@ int k_msgq_cleanup(struct k_msgq *msgq); */ __syscall int k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout); +/** + * @brief Send a message to the front of a message queue. + * + * This routine sends a message to the beginning (head) of message queue @a q. + * Messages sent with this method will be retrieved before any pre-existing + * messages in the queue. + * + * @note if there is no space in the message queue, this function will + * behave the same as k_msgq_put. + * + * @note The message content is copied from @a data into @a msgq and the @a data + * pointer is not retained, so the message content will not be modified + * by this function. + * + * @funcprops \isr_ok + * + * @param msgq Address of the message queue. + * @param data Pointer to the message. + * @param timeout Waiting period to add the message, or one of the special + * values K_NO_WAIT and K_FOREVER. + * + * @retval 0 Message sent. + * @retval -ENOMSG Returned without waiting or queue purged. + * @retval -EAGAIN Waiting period timed out. + */ +__syscall int k_msgq_put_front(struct k_msgq *msgq, const void *data, k_timeout_t timeout); + /** * @brief Receive a message from a message queue. * diff --git a/include/zephyr/tracing/tracing.h b/include/zephyr/tracing/tracing.h index 13b27cdbddc99..cb12726cabcb1 100644 --- a/include/zephyr/tracing/tracing.h +++ b/include/zephyr/tracing/tracing.h @@ -1419,6 +1419,28 @@ */ #define sys_port_trace_k_msgq_put_exit(msgq, timeout, ret) +/** + * @brief Trace Message Queue put at front attempt entry + * @param msgq Message Queue object + * @param timeout Timeout period + */ +#define sys_port_trace_k_msgq_put_front_enter(msgq, timeout) + +/** + * @brief Trace Message Queue put at front attempt blocking + * @param msgq Message Queue object + * @param timeout Timeout period + */ +#define sys_port_trace_k_msgq_put_front_blocking(msgq, timeout) + +/** + * @brief Trace Message Queue put at front attempt outcome + * @param msgq Message Queue object + * @param timeout Timeout period + * @param ret Return value + */ +#define sys_port_trace_k_msgq_put_front_exit(msgq, timeout, ret) + /** * @brief Trace Message Queue get attempt entry * @param msgq Message Queue object diff --git a/kernel/msg_q.c b/kernel/msg_q.c index 7b8686ffa630c..af77ce5a5f3dd 100644 --- a/kernel/msg_q.c +++ b/kernel/msg_q.c @@ -124,8 +124,8 @@ int k_msgq_cleanup(struct k_msgq *msgq) return 0; } - -int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout) +static inline int put_msg_in_queue(struct k_msgq *msgq, const void *data, + k_timeout_t timeout, bool put_at_back) { __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), ""); @@ -136,7 +136,11 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout key = k_spin_lock(&msgq->lock); - SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout); + if (put_at_back) { + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout); + } else { + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put_front, msgq, timeout); + } if (msgq->used_msgs < msgq->max_msgs) { /* message queue isn't full */ @@ -150,13 +154,30 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout arch_thread_return_value_set(pending_thread, 0); z_ready_thread(pending_thread); } else { - /* put message in queue */ __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start && msgq->write_ptr < msgq->buffer_end); - (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size); - msgq->write_ptr += msgq->msg_size; - if (msgq->write_ptr == msgq->buffer_end) { - msgq->write_ptr = msgq->buffer_start; + if (put_at_back) { + /* + * to write a message to the back of the queue, + * copy the message and increment write_ptr + */ + (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size); + msgq->write_ptr += msgq->msg_size; + if (msgq->write_ptr == msgq->buffer_end) { + msgq->write_ptr = msgq->buffer_start; + } + } else { + /* + * to write a message to the head of the queue, + * first decrement the read pointer (to open + * space at the front of the queue) then copy + * the message to the newly created space. + */ + if (msgq->read_ptr == msgq->buffer_start) { + msgq->read_ptr = msgq->buffer_end; + } + msgq->read_ptr -= msgq->msg_size; + (void)memcpy(msgq->read_ptr, (char *)data, msgq->msg_size); } msgq->used_msgs++; resched = handle_poll_events(msgq); @@ -166,17 +187,31 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout /* don't wait for message space to become available */ result = -ENOMSG; } else { - SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout); + if (put_at_back) { + SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout); + } else { + SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put_front, msgq, timeout); + } /* wait for put message success, failure, or timeout */ _current->base.swap_data = (void *) data; result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout); - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); + + if (put_at_back) { + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); + } else { + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result); + } + return result; } - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); + if (put_at_back) { + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); + } else { + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result); + } if (resched) { z_reschedule(&msgq->lock, key); @@ -187,6 +222,17 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout return result; } + +int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout) +{ + return put_msg_in_queue(msgq, data, timeout, true); +} + +int z_impl_k_msgq_put_front(struct k_msgq *msgq, const void *data, k_timeout_t timeout) +{ + return put_msg_in_queue(msgq, data, timeout, false); +} + #ifdef CONFIG_USERSPACE static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout) @@ -197,6 +243,16 @@ static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data, return z_impl_k_msgq_put(msgq, data, timeout); } #include + +static inline int z_vrfy_k_msgq_put_front(struct k_msgq *msgq, const void *data, + k_timeout_t timeout) +{ + K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ)); + K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size)); + + return z_impl_k_msgq_put_front(msgq, data, timeout); +} +#include #endif /* CONFIG_USERSPACE */ void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs) diff --git a/samples/kernel/msg_queue/CMakeLists.txt b/samples/kernel/msg_queue/CMakeLists.txt new file mode 100644 index 0000000000000..a4852fd1fea18 --- /dev/null +++ b/samples/kernel/msg_queue/CMakeLists.txt @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(msg_queue) + +target_sources(app PRIVATE src/main.c) diff --git a/samples/kernel/msg_queue/README.rst b/samples/kernel/msg_queue/README.rst new file mode 100644 index 0000000000000..8cd550ab52793 --- /dev/null +++ b/samples/kernel/msg_queue/README.rst @@ -0,0 +1,52 @@ +.. zephyr:code-sample:: msg_queue + :name: Message Queue + + Implements a basic message queue producer/consumer thread pair. + +Overview +******** + +A sample demonstrating the basic usage of Zephyr message queues. +A producer thread sends both normal and urgent messages to be retrieved +by a consumer thread. + +Building and Running +******************** + +This application can be built and executed on QEMU as follows: + +.. zephyr-app-commands:: + :zephyr-app: samples/kernel/msg_queue + :host-os: unix + :board: qemu_x86 + :goals: run + :compact: + +To build for another board target, replace "qemu_x86" above with it. + +Sample Output +============= + +Every normal message is put at the end of the queue, and they are delivered +in FIFO order. Every "urgent" message is put at the beginning of the queue, +and it is delivered first as long as no other "urgent" message comes in after +it. + +In this sample, one producer thread sends 1 urgent message for each 2 normal +ones. Note that message C is the first retrieved because it was the last one +sent as "urgent". + +.. code-block:: console + + [producer] sending: 0 + [producer] sending: 1 + [producer] sending: A (urgent) + [producer] sending: 2 + [producer] sending: 3 + [producer] sending: B (urgent) + [producer] sending: 4 + [producer] sending: 5 + [producer] sending: C (urgent) + [consumer] got sequence: CBA012345 + +Exit QEMU by pressing :kbd:`CTRL+A` :kbd:`x`. diff --git a/samples/kernel/msg_queue/prj.conf b/samples/kernel/msg_queue/prj.conf new file mode 100644 index 0000000000000..b2a4ba591044e --- /dev/null +++ b/samples/kernel/msg_queue/prj.conf @@ -0,0 +1 @@ +# nothing here diff --git a/samples/kernel/msg_queue/sample.yaml b/samples/kernel/msg_queue/sample.yaml new file mode 100644 index 0000000000000..e4156ef927632 --- /dev/null +++ b/samples/kernel/msg_queue/sample.yaml @@ -0,0 +1,15 @@ +sample: + description: Message queue demo sample + name: message queue +common: + tags: + - message_queue +tests: + sample.kernel.msgqueue: + integration_platforms: + - native_sim + harness: console + harness_config: + type: one_line + regex: + - ".*CBA012345" diff --git a/samples/kernel/msg_queue/src/main.c b/samples/kernel/msg_queue/src/main.c new file mode 100644 index 0000000000000..586dec68fe0ff --- /dev/null +++ b/samples/kernel/msg_queue/src/main.c @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2025 Instituto Superior de Engenharia do Porto (ISEP). + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#define BUF_SIZE 10 +#define INACTIVE -1 +#define PRIORITY 5 + +K_MSGQ_DEFINE(my_msgq, sizeof(char), BUF_SIZE, 1); + +void producer_function(void *rec, void *p2, void *p3) +{ + k_tid_t receiving_thread = (k_tid_t) rec; + + char normal_data = '0'; + char urgent_data = 'A'; + int total_sent = 0; + + /* + * sends messages every 100 msec, in repeating + * sequence: normal, normal, urgent, ... + */ + while (total_sent < (BUF_SIZE - 1)) { + for (int i = 0; i < 2; i++) { + printk("[producer] sending: %c\n", normal_data); + k_msgq_put(&my_msgq, &normal_data, K_NO_WAIT); + k_sleep(K_MSEC(100)); + normal_data++; + } + printk("[producer] sending: %c (urgent)\n", urgent_data); + k_msgq_put_front(&my_msgq, &urgent_data, K_NO_WAIT); + k_sleep(K_MSEC(100)); + urgent_data++; + + total_sent += 3; + } + + /* + * finished sending messages, now start the receiving thread. + * keep in mind both threads can be running at the same time, + * but in this example we wish to see the queue accumulate some + * messages before the receiver thread starts reading them out. + */ + k_thread_start(receiving_thread); +} + +void consumer_function(void *p1, void *p2, void *p3) +{ + char received[BUF_SIZE]; + + for (int i = 0; i < (BUF_SIZE - 1); i++) { + k_msgq_get(&my_msgq, &received[i], K_NO_WAIT); + } + + received[BUF_SIZE - 1] = '\0'; + /* we expect to see CBA012345... */ + printk("[consumer] got sequence: %s\n", received); +} + +K_THREAD_DEFINE(consumer_thread, 2048, consumer_function, + NULL, NULL, NULL, PRIORITY, 0, INACTIVE); + +K_THREAD_DEFINE(producer_thread, 2048, producer_function, + ((void *) consumer_thread), NULL, NULL, PRIORITY, 0, 0); diff --git a/subsys/tracing/ctf/tracing_ctf.h b/subsys/tracing/ctf/tracing_ctf.h index c1d8b65bd6b30..35104679a1872 100644 --- a/subsys/tracing/ctf/tracing_ctf.h +++ b/subsys/tracing/ctf/tracing_ctf.h @@ -263,6 +263,9 @@ extern "C" { #define sys_port_trace_k_msgq_put_enter(msgq, timeout) #define sys_port_trace_k_msgq_put_blocking(msgq, timeout) #define sys_port_trace_k_msgq_put_exit(msgq, timeout, ret) +#define sys_port_trace_k_msgq_put_front_enter(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_blocking(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_exit(msgq, timeout, ret) #define sys_port_trace_k_msgq_get_enter(msgq, timeout) #define sys_port_trace_k_msgq_get_blocking(msgq, timeout) #define sys_port_trace_k_msgq_get_exit(msgq, timeout, ret) diff --git a/subsys/tracing/sysview/tracing_sysview.h b/subsys/tracing/sysview/tracing_sysview.h index d13ab42dd8de6..9bbbfb7ab6c90 100644 --- a/subsys/tracing/sysview/tracing_sysview.h +++ b/subsys/tracing/sysview/tracing_sysview.h @@ -513,6 +513,9 @@ void sys_trace_thread_info(struct k_thread *thread); #define sys_port_trace_k_msgq_put_enter(msgq, timeout) #define sys_port_trace_k_msgq_put_blocking(msgq, timeout) #define sys_port_trace_k_msgq_put_exit(msgq, timeout, ret) +#define sys_port_trace_k_msgq_put_front_enter(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_blocking(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_exit(msgq, timeout, ret) #define sys_port_trace_k_msgq_get_enter(msgq, timeout) #define sys_port_trace_k_msgq_get_blocking(msgq, timeout) #define sys_port_trace_k_msgq_get_exit(msgq, timeout, ret) diff --git a/subsys/tracing/test/tracing_test.h b/subsys/tracing/test/tracing_test.h index a6208337e382b..8d0a288e26ba6 100644 --- a/subsys/tracing/test/tracing_test.h +++ b/subsys/tracing/test/tracing_test.h @@ -312,6 +312,14 @@ sys_trace_k_msgq_put_blocking(msgq, data, timeout) #define sys_port_trace_k_msgq_put_exit(msgq, timeout, ret) \ sys_trace_k_msgq_put_exit(msgq, data, timeout, ret) + +#define sys_port_trace_k_msgq_put_front_enter(msgq, timeout) \ + sys_trace_k_msgq_put_front_enter(msgq, data, timeout) +#define sys_port_trace_k_msgq_put_front_blocking(msgq, timeout) \ + sys_trace_k_msgq_put_front_blocking(msgq, data, timeout) +#define sys_port_trace_k_msgq_put_front_exit(msgq, timeout, ret) \ + sys_trace_k_msgq_put_front_exit(msgq, data, timeout, ret) + #define sys_port_trace_k_msgq_get_enter(msgq, timeout) \ sys_trace_k_msgq_get_enter(msgq, data, timeout) #define sys_port_trace_k_msgq_get_blocking(msgq, timeout) \ @@ -684,6 +692,11 @@ void sys_trace_k_msgq_cleanup_exit(struct k_msgq *msgq, int ret); void sys_trace_k_msgq_put_enter(struct k_msgq *msgq, const void *data, k_timeout_t timeout); void sys_trace_k_msgq_put_blocking(struct k_msgq *msgq, const void *data, k_timeout_t timeout); void sys_trace_k_msgq_put_exit(struct k_msgq *msgq, const void *data, k_timeout_t timeout, int ret); +void sys_trace_k_msgq_put_front_enter(struct k_msgq *msgq, const void *data, k_timeout_t timeout); +void sys_trace_k_msgq_put_front_blocking(struct k_msgq *msgq, const void *data, + k_timeout_t timeout); +void sys_trace_k_msgq_put_front_exit(struct k_msgq *msgq, const void *data, k_timeout_t timeout, + int ret); void sys_trace_k_msgq_get_enter(struct k_msgq *msgq, const void *data, k_timeout_t timeout); void sys_trace_k_msgq_get_blocking(struct k_msgq *msgq, const void *data, k_timeout_t timeout); void sys_trace_k_msgq_get_exit(struct k_msgq *msgq, const void *data, k_timeout_t timeout, int ret); diff --git a/subsys/tracing/user/tracing_user.h b/subsys/tracing/user/tracing_user.h index 9d1fd6f8c7b9f..0cef2626ee075 100644 --- a/subsys/tracing/user/tracing_user.h +++ b/subsys/tracing/user/tracing_user.h @@ -304,6 +304,9 @@ void sys_trace_gpio_fire_callback_user(const struct device *port, struct gpio_ca #define sys_port_trace_k_msgq_put_enter(msgq, timeout) #define sys_port_trace_k_msgq_put_blocking(msgq, timeout) #define sys_port_trace_k_msgq_put_exit(msgq, timeout, ret) +#define sys_port_trace_k_msgq_put_front_enter(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_blocking(msgq, timeout) +#define sys_port_trace_k_msgq_put_front_exit(msgq, timeout, ret) #define sys_port_trace_k_msgq_get_enter(msgq, timeout) #define sys_port_trace_k_msgq_get_blocking(msgq, timeout) #define sys_port_trace_k_msgq_get_exit(msgq, timeout, ret)