-
Notifications
You must be signed in to change notification settings - Fork 7.7k
kernel: msgq: adding support for k_msgq_put_front
and sample code
#92865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4785,7 +4785,7 @@ __syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size, | |
int k_msgq_cleanup(struct k_msgq *msgq); | ||
|
||
/** | ||
* @brief Send a message to a message queue. | ||
* @brief Send a message to the end of a message queue. | ||
* | ||
* This routine sends a message to message queue @a q. | ||
* | ||
|
@@ -4806,6 +4806,33 @@ int k_msgq_cleanup(struct k_msgq *msgq); | |
*/ | ||
__syscall int k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout); | ||
|
||
/** | ||
* @brief Send a message to the front of a message queue. | ||
* | ||
* This routine sends a message to the beginning (head) of message queue @a q. | ||
* Messages sent with this method will be retrieved before any pre-existing | ||
* messages in the queue. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that it is worthwhile to point out that this urgent behavior only applies only if there is available space in the message queue's buffer. If that buffer is full, then all urgency is ignored and it behaves the same as k_msgq_put(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have ditched the @note if there is no space in the message queue, this function will behave the same as k_msgq_put. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding the note. I still think that it is important to have in there. |
||
* | ||
* @note if there is no space in the message queue, this function will | ||
* behave the same as k_msgq_put. | ||
* | ||
* @note The message content is copied from @a data into @a msgq and the @a data | ||
* pointer is not retained, so the message content will not be modified | ||
* by this function. | ||
* | ||
* @funcprops \isr_ok | ||
* | ||
* @param msgq Address of the message queue. | ||
* @param data Pointer to the message. | ||
* @param timeout Waiting period to add the message, or one of the special | ||
* values K_NO_WAIT and K_FOREVER. | ||
* | ||
* @retval 0 Message sent. | ||
* @retval -ENOMSG Returned without waiting or queue purged. | ||
* @retval -EAGAIN Waiting period timed out. | ||
*/ | ||
__syscall int k_msgq_put_front(struct k_msgq *msgq, const void *data, k_timeout_t timeout); | ||
|
||
/** | ||
* @brief Receive a message from a message queue. | ||
* | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -124,8 +124,8 @@ int k_msgq_cleanup(struct k_msgq *msgq) | |||||||||||||||||
return 0; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout) | ||||||||||||||||||
static inline int put_msg_in_queue(struct k_msgq *msgq, const void *data, | ||||||||||||||||||
k_timeout_t timeout, bool put_at_back) | ||||||||||||||||||
{ | ||||||||||||||||||
__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), ""); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, that removal was by accident. I have restored it to the original state. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is condition for If a thread go to sleep, we need additional bit to point out the reason to sleep. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both checks are needed as blocking calls are not allowed at ISR level. !arch_is_in_isr() : If true, then we are at at thread level, which is OK. Bail out of the assert. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You can cherry-pick from my PR, just add me to co-authoer, I will also appreciate it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TaiJuWu I will check that with my advisor, but as far as I know I unfortunately can't add you as co-author because this contribution is related to my thesis work, and thus it is required of me to be the only author. I have added an extra commit exclusively for the tracing following your contribution. Cherry-picking wasn't possible due to the suffix difference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am fine for every decision you make ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation! |
||||||||||||||||||
|
||||||||||||||||||
|
@@ -136,7 +136,11 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout | |||||||||||||||||
|
||||||||||||||||||
key = k_spin_lock(&msgq->lock); | ||||||||||||||||||
|
||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout); | ||||||||||||||||||
if (put_at_back) { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout); | ||||||||||||||||||
} else { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put_front, msgq, timeout); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (msgq->used_msgs < msgq->max_msgs) { | ||||||||||||||||||
/* message queue isn't full */ | ||||||||||||||||||
|
@@ -150,13 +154,31 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout | |||||||||||||||||
arch_thread_return_value_set(pending_thread, 0); | ||||||||||||||||||
z_ready_thread(pending_thread); | ||||||||||||||||||
} else { | ||||||||||||||||||
/* put message in queue */ | ||||||||||||||||||
__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start && | ||||||||||||||||||
msgq->write_ptr < msgq->buffer_end); | ||||||||||||||||||
(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size); | ||||||||||||||||||
msgq->write_ptr += msgq->msg_size; | ||||||||||||||||||
if (msgq->write_ptr == msgq->buffer_end) { | ||||||||||||||||||
msgq->write_ptr = msgq->buffer_start; | ||||||||||||||||||
if (put_at_back) { | ||||||||||||||||||
/* | ||||||||||||||||||
* writing a message to the back of the queue is | ||||||||||||||||||
* simple and effective: copy the message, then | ||||||||||||||||||
* increment write_ptr. | ||||||||||||||||||
Comment on lines
+161
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
*/ | ||||||||||||||||||
(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size); | ||||||||||||||||||
msgq->write_ptr += msgq->msg_size; | ||||||||||||||||||
if (msgq->write_ptr == msgq->buffer_end) { | ||||||||||||||||||
msgq->write_ptr = msgq->buffer_start; | ||||||||||||||||||
} | ||||||||||||||||||
} else { | ||||||||||||||||||
/* | ||||||||||||||||||
* writing a message to the head of the queue can | ||||||||||||||||||
* be achieved by simply decrementing the read pointer | ||||||||||||||||||
* to open space at the front of the queue, then | ||||||||||||||||||
* copying the message to the newly created space. | ||||||||||||||||||
Comment on lines
+172
to
+175
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
*/ | ||||||||||||||||||
if (msgq->read_ptr == msgq->buffer_start) { | ||||||||||||||||||
msgq->read_ptr = msgq->buffer_end; | ||||||||||||||||||
} | ||||||||||||||||||
msgq->read_ptr -= msgq->msg_size; | ||||||||||||||||||
(void)memcpy(msgq->read_ptr, (char *)data, msgq->msg_size); | ||||||||||||||||||
} | ||||||||||||||||||
msgq->used_msgs++; | ||||||||||||||||||
resched = handle_poll_events(msgq); | ||||||||||||||||||
|
@@ -166,17 +188,31 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout | |||||||||||||||||
/* don't wait for message space to become available */ | ||||||||||||||||||
result = -ENOMSG; | ||||||||||||||||||
} else { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout); | ||||||||||||||||||
if (put_at_back) { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout); | ||||||||||||||||||
} else { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put_front, msgq, timeout); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/* wait for put message success, failure, or timeout */ | ||||||||||||||||||
_current->base.swap_data = (void *) data; | ||||||||||||||||||
|
||||||||||||||||||
result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout); | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); | ||||||||||||||||||
|
||||||||||||||||||
if (put_at_back) { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); | ||||||||||||||||||
} else { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return result; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); | ||||||||||||||||||
if (put_at_back) { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result); | ||||||||||||||||||
} else { | ||||||||||||||||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (resched) { | ||||||||||||||||||
z_reschedule(&msgq->lock, key); | ||||||||||||||||||
|
@@ -187,6 +223,17 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout | |||||||||||||||||
return result; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout) | ||||||||||||||||||
{ | ||||||||||||||||||
return put_msg_in_queue(msgq, data, timeout, true); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
int z_impl_k_msgq_put_front(struct k_msgq *msgq, const void *data, k_timeout_t timeout) | ||||||||||||||||||
{ | ||||||||||||||||||
return put_msg_in_queue(msgq, data, timeout, false); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
#ifdef CONFIG_USERSPACE | ||||||||||||||||||
static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data, | ||||||||||||||||||
k_timeout_t timeout) | ||||||||||||||||||
|
@@ -197,6 +244,16 @@ static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data, | |||||||||||||||||
return z_impl_k_msgq_put(msgq, data, timeout); | ||||||||||||||||||
} | ||||||||||||||||||
#include <zephyr/syscalls/k_msgq_put_mrsh.c> | ||||||||||||||||||
|
||||||||||||||||||
static inline int z_vrfy_k_msgq_put_front(struct k_msgq *msgq, const void *data, | ||||||||||||||||||
k_timeout_t timeout) | ||||||||||||||||||
{ | ||||||||||||||||||
K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ)); | ||||||||||||||||||
K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size)); | ||||||||||||||||||
|
||||||||||||||||||
return z_impl_k_msgq_put_front(msgq, data, timeout); | ||||||||||||||||||
} | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we are missing ... #include <zephyr/syscalls/k_msgq_put_front_mrsh.c> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. really? ok, but what's that for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's part of the system call infrastructure--for marshalling function. |
||||||||||||||||||
#include <zephyr/syscalls/k_msgq_put_front_mrsh.c> | ||||||||||||||||||
#endif /* CONFIG_USERSPACE */ | ||||||||||||||||||
|
||||||||||||||||||
void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs) | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
cmake_minimum_required(VERSION 3.20.0) | ||
|
||
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) | ||
project(msg_queue) | ||
|
||
target_sources(app PRIVATE src/main.c) |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,50 @@ | ||||||||||||||
.. zephyr:code-sample:: msg_queue | ||||||||||||||
:name: Message Queue | ||||||||||||||
|
||||||||||||||
Implements a basic message queue producer/consumer thread pair. | ||||||||||||||
|
||||||||||||||
Overview | ||||||||||||||
******** | ||||||||||||||
|
||||||||||||||
A simple sample to demonstrate the basic usage of Zephyr message queues. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
a producer thread sends both normal and urgent messages. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Building and Running | ||||||||||||||
******************** | ||||||||||||||
|
||||||||||||||
This application can be built and executed on QEMU as follows: | ||||||||||||||
|
||||||||||||||
.. zephyr-app-commands:: | ||||||||||||||
:zephyr-app: samples/kernel/msg_queue | ||||||||||||||
:host-os: unix | ||||||||||||||
:board: qemu_x86 | ||||||||||||||
:goals: run | ||||||||||||||
:compact: | ||||||||||||||
|
||||||||||||||
To build for another board, change "qemu_x86" above to that board's name. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
Sample Output | ||||||||||||||
============= | ||||||||||||||
|
||||||||||||||
Every normal message sending implies sending the message to the end of the | ||||||||||||||
queue, and the first message to go is the first to be delivered. Every "urgent" | ||||||||||||||
message, at its turn, implies sending the message to the beginning of the queue. | ||||||||||||||
Comment on lines
+29
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
|
||||||||||||||
In this sample, one producer thread sends 1 urgent message for each 2 regular | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
ones. Note that message C is the first retrieved because it was the last one | ||||||||||||||
sent as "urgent". | ||||||||||||||
|
||||||||||||||
.. code-block:: console | ||||||||||||||
[producer] sending: 0 | ||||||||||||||
[producer] sending: 1 | ||||||||||||||
[producer] sending: A (urgent) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. urgent or front? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the context of the sample, the publisher thread sends normal and urgent messages. It's not related to the API naming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks. |
||||||||||||||
[producer] sending: 2 | ||||||||||||||
[producer] sending: 3 | ||||||||||||||
[producer] sending: B (urgent) | ||||||||||||||
[producer] sending: 4 | ||||||||||||||
[producer] sending: 5 | ||||||||||||||
[producer] sending: C (urgent) | ||||||||||||||
[consumer] got sequence: CBA012345 | ||||||||||||||
Exit QEMU by pressing :kbd:`CTRL+A` :kbd:`x`. |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,2 @@ | ||||
# nothing here | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
CONFIG_TRACING=y |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,17 @@ | ||||||||||
sample: | ||||||||||
description: A message queue usage sample | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
name: message queue sample | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
common: | ||||||||||
tags: | ||||||||||
- message_queue | ||||||||||
tests: | ||||||||||
sample.kernel.msgqueue: | ||||||||||
integration_platforms: | ||||||||||
- native_sim | ||||||||||
tags: | ||||||||||
- message_queue | ||||||||||
Comment on lines
+11
to
+12
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
already above |
||||||||||
harness: console | ||||||||||
harness_config: | ||||||||||
type: one_line | ||||||||||
regex: | ||||||||||
- ".*CBA012345" |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,68 @@ | ||||||
/* | ||||||
* Copyright (c) 2025 Instituto Superior de Engenharia do Porto (ISEP). | ||||||
* | ||||||
* SPDX-License-Identifier: Apache-2.0 | ||||||
*/ | ||||||
|
||||||
#include <zephyr/kernel.h> | ||||||
|
||||||
#define BUF_SIZE 10 | ||||||
#define INACTIVE -1 | ||||||
#define PRIORITY 5 | ||||||
|
||||||
K_MSGQ_DEFINE(my_msgq, sizeof(char), BUF_SIZE, 1); | ||||||
|
||||||
void producer_function(void *rec, void *p2, void *p3) | ||||||
{ | ||||||
k_tid_t receiving_thread = (k_tid_t) rec; | ||||||
|
||||||
char normal_data = '0'; | ||||||
char urgent_data = 'A'; | ||||||
int total_sent = 0; | ||||||
|
||||||
/* | ||||||
* sends messages every 100 msec, in repeating | ||||||
* sequence normal, normal, urgent, ... | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
while (total_sent < (BUF_SIZE - 1)) { | ||||||
for (int i = 0; i < 2; i++) { | ||||||
printk("[producer] sending: %c\n", normal_data); | ||||||
k_msgq_put(&my_msgq, &normal_data, K_NO_WAIT); | ||||||
k_sleep(K_MSEC(100)); | ||||||
normal_data++; | ||||||
} | ||||||
printk("[producer] sending: %c (urgent)\n", urgent_data); | ||||||
k_msgq_put_front(&my_msgq, &urgent_data, K_NO_WAIT); | ||||||
k_sleep(K_MSEC(100)); | ||||||
urgent_data++; | ||||||
|
||||||
total_sent += 3; | ||||||
} | ||||||
|
||||||
/* | ||||||
* finished sending messages, now start the receiving thread. | ||||||
* keep in mind both threads can be running at the same time, | ||||||
* but in this example we wish to see the queue accumulate some | ||||||
* messages before the receiver thread starts reading them out. | ||||||
*/ | ||||||
k_thread_start(receiving_thread); | ||||||
} | ||||||
|
||||||
void consumer_function(void *p1, void *p2, void *p3) | ||||||
{ | ||||||
char received[BUF_SIZE]; | ||||||
|
||||||
for (int i = 0; i < (BUF_SIZE - 1); i++) { | ||||||
k_msgq_get(&my_msgq, &received[i], K_NO_WAIT); | ||||||
} | ||||||
|
||||||
received[BUF_SIZE - 1] = '\0'; | ||||||
/* we expect to see CBA012345... */ | ||||||
printk("[consumer] got sequence: %s\n", received); | ||||||
} | ||||||
|
||||||
K_THREAD_DEFINE(consumer_thread, 2048, consumer_function, | ||||||
NULL, NULL, NULL, PRIORITY, 0, INACTIVE); | ||||||
|
||||||
K_THREAD_DEFINE(producer_thread, 2048, producer_function, | ||||||
((void *) consumer_thread), NULL, NULL, PRIORITY, 0, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit message still has a referent to k_msgq_put_urgent instead of k_msgq_put_front.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the notice, changed it already.