Skip to content

Commit d4fac33

Browse files
author
Alexander pinheiro paschoaletto
committed
kernel: msgq: adding support for k_msgq_put_urgent and sample code
This commit introduces the k_msgq_put_urgent API for sending messages to a queue in a LIFO scheme. It also adds a simple sample code that demonstrates how it works. Signed-off-by: Alexander pinheiro paschoaletto <axelpinheiro@gmail.com>
1 parent d4deeeb commit d4fac33

File tree

7 files changed

+268
-1
lines changed

7 files changed

+268
-1
lines changed

include/zephyr/kernel.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4694,7 +4694,7 @@ __syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
46944694
int k_msgq_cleanup(struct k_msgq *msgq);
46954695

46964696
/**
4697-
* @brief Send a message to a message queue.
4697+
* @brief Send a message to the end of a message queue.
46984698
*
46994699
* This routine sends a message to message queue @a q.
47004700
*
@@ -4715,6 +4715,30 @@ int k_msgq_cleanup(struct k_msgq *msgq);
47154715
*/
47164716
__syscall int k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout);
47174717

4718+
/**
4719+
* @brief Send a message to the beginning of a message queue.
4720+
*
4721+
* This routine sends a message to the beginning (head) of message queue @a q.
4722+
* Messages sent with this method will be retrieved before any pre-existing
4723+
* messages in the queue.
4724+
*
4725+
* @note The message content is copied from @a data into @a msgq and the @a data
4726+
* pointer is not retained, so the message content will not be modified
4727+
* by this function.
4728+
*
4729+
* @funcprops \isr_ok
4730+
*
4731+
* @param msgq Address of the message queue.
4732+
* @param data Pointer to the message.
4733+
* @param timeout Waiting period to add the message, or one of the special
4734+
* values K_NO_WAIT and K_FOREVER.
4735+
*
4736+
* @retval 0 Message sent.
4737+
* @retval -ENOMSG Returned without waiting or queue purged.
4738+
* @retval -EAGAIN Waiting period timed out.
4739+
*/
4740+
__syscall int k_msgq_put_urgent(struct k_msgq *msgq, const void *data, k_timeout_t timeout);
4741+
47184742
/**
47194743
* @brief Receive a message from a message queue.
47204744
*

kernel/msg_q.c

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,100 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout
183183
return result;
184184
}
185185

186+
int z_impl_k_msgq_put_urgent(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
187+
{
188+
__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
189+
190+
struct k_thread *pending_thread;
191+
k_spinlock_key_t key;
192+
int result;
193+
194+
key = k_spin_lock(&msgq->lock);
195+
196+
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
197+
198+
if (msgq->used_msgs < msgq->max_msgs) {
199+
/* message queue isn't full */
200+
pending_thread = z_unpend_first_thread(&msgq->wait_q);
201+
if (unlikely(pending_thread != NULL)) {
202+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
203+
204+
/* give message to waiting thread */
205+
(void)memcpy(pending_thread->base.swap_data, data,
206+
msgq->msg_size);
207+
/* wake up waiting thread */
208+
arch_thread_return_value_set(pending_thread, 0);
209+
z_ready_thread(pending_thread);
210+
z_reschedule(&msgq->lock, key);
211+
return 0;
212+
} else {
213+
/*
214+
* urgent put = write message to the head of the queue.
215+
* this is achieved in practice by simply:
216+
*
217+
* 1. decrementing the read pointer. This effectively
218+
* opens space for the incoming message in the head of
219+
* the queue.
220+
*
221+
* 2. temporarily matching the read pointer with the
222+
* write pointer. This way the message will be written
223+
* in the recently opened space.
224+
*
225+
* 3. reverting the write pointer after the write to its
226+
* original value. The read pointer, by its turn, should
227+
* be always at the head, so it doesn't need reverting
228+
* because we just wrote to the head.
229+
*/
230+
char *original_write_ptr = msgq->write_ptr;
231+
232+
/* decrement the read pointer */
233+
msgq->read_ptr -= msgq->msg_size;
234+
if(msgq->read_ptr < msgq->buffer_start){
235+
msgq->read_ptr = msgq->buffer_end - msgq->msg_size;
236+
}
237+
238+
/* match read and write pointers */
239+
msgq->write_ptr = msgq->read_ptr;
240+
241+
/* put message in queue (like usual) */
242+
__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
243+
msgq->write_ptr < msgq->buffer_end);
244+
(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
245+
msgq->write_ptr += msgq->msg_size;
246+
if (msgq->write_ptr == msgq->buffer_end) {
247+
msgq->write_ptr = msgq->buffer_start;
248+
}
249+
msgq->used_msgs++;
250+
251+
/* revert write pointer to the original value */
252+
msgq->write_ptr = original_write_ptr;
253+
254+
#ifdef CONFIG_POLL
255+
handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
256+
#endif /* CONFIG_POLL */
257+
}
258+
result = 0;
259+
} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
260+
/* don't wait for message space to become available */
261+
result = -ENOMSG;
262+
} else {
263+
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
264+
265+
/* wait for put message success, failure, or timeout */
266+
_current->base.swap_data = (void *) data;
267+
268+
result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
269+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
270+
return result;
271+
}
272+
273+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
274+
275+
z_reschedule(&msgq->lock, key);
276+
277+
return result;
278+
}
279+
186280
#ifdef CONFIG_USERSPACE
187281
static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
188282
k_timeout_t timeout)
@@ -192,6 +286,15 @@ static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
192286

193287
return z_impl_k_msgq_put(msgq, data, timeout);
194288
}
289+
290+
static inline int z_vrfy_k_msgq_put_urgent(struct k_msgq *msgq, const void *data,
291+
k_timeout_t timeout)
292+
{
293+
K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
294+
K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
295+
296+
return z_impl_k_msgq_put_urgent(msgq, data, timeout);
297+
}
195298
#include <zephyr/syscalls/k_msgq_put_mrsh.c>
196299
#endif /* CONFIG_USERSPACE */
197300

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
cmake_minimum_required(VERSION 3.20.0)
4+
5+
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
6+
project(msg_queue)
7+
8+
target_sources(app PRIVATE src/main.c)

samples/basic/msg_queue/README.rst

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
.. zephyr:code-sample:: msg_queue
2+
:name: Message Queue
3+
4+
Implements a basic message queue producer/consumer thread pair.
5+
6+
Overview
7+
********
8+
9+
A simple sample to demonstrate the basic usage of Zephyr message queues.
10+
a producer thread sends both normal and urgent messages.
11+
12+
Building and Running
13+
********************
14+
15+
This application can be built and executed on QEMU as follows:
16+
17+
.. zephyr-app-commands::
18+
:zephyr-app: samples/basic/msg_queue
19+
:host-os: unix
20+
:board: qemu_x86
21+
:goals: run
22+
:compact:
23+
24+
To build for another board, change "qemu_x86" above to that board's name.
25+
26+
Sample Output
27+
=============
28+
29+
Every normal message sending implies sending the message to the end of the
30+
queue, and the first message to go is the first to be delivered. Every "urgent"
31+
message, at its turn, implies sending the message to the beginning of the queue.
32+
33+
In this sample, one producer thread sends 1 urgent message for each 2 regular
34+
ones. Note that message C is the first retrieved because it was the last one
35+
sent as "urgent".
36+
37+
.. code-block:: console
38+
39+
[producer] sending: 0
40+
[producer] sending: 1
41+
[producer] sending: A (urgent)
42+
[producer] sending: 2
43+
[producer] sending: 3
44+
[producer] sending: B (urgent)
45+
[producer] sending: 4
46+
[producer] sending: 5
47+
[producer] sending: C (urgent)
48+
[consumer] got sequence: CBA012345
49+
50+
Exit QEMU by pressing :kbd:`CTRL+A` :kbd:`x`.

samples/basic/msg_queue/prj.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# nothing here

samples/basic/msg_queue/sample.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
sample:
2+
description: A message queue usage sample
3+
name: message queue sample
4+
common:
5+
tags:
6+
- message queues
7+
tests:
8+
sample.msgqueue:
9+
build_only: true
10+
platform_allow:
11+
- qemu_riscv32
12+
- qemu_cortex_a53
13+
integration_platforms:
14+
- qemu_riscv32
15+
tags:
16+
- message
17+
- queue

samples/basic/msg_queue/src/main.c

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2025 Instituto Superior de Engenharia do Porto (ISEP).
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include <zephyr/kernel.h>
8+
9+
#define BUF_SIZE 10
10+
#define INACTIVE -1
11+
#define PRIORITY 5
12+
13+
K_MSGQ_DEFINE(my_msgq, sizeof(char), BUF_SIZE, 1);
14+
15+
void producer_function(void *rec, void *p2, void *p3){
16+
17+
k_tid_t receiving_thread = (k_tid_t) rec;
18+
19+
char normal_data = '0';
20+
char urgent_data = 'A';
21+
int total_sent = 0;
22+
23+
/*
24+
* sends messages every 100 msec, in repeating
25+
* sequence normal, normal, urgent, ...
26+
*/
27+
while(total_sent < (BUF_SIZE - 1)){
28+
for(int i = 0; i < 2; i++){
29+
printk("[producer] sending: %c\n", normal_data);
30+
k_msgq_put(&my_msgq, &normal_data, K_NO_WAIT);
31+
k_sleep(K_MSEC(100));
32+
normal_data++;
33+
}
34+
printk("[producer] sending: %c (urgent)\n", urgent_data);
35+
k_msgq_put_urgent(&my_msgq, &urgent_data, K_NO_WAIT);
36+
k_sleep(K_MSEC(100));
37+
urgent_data++;
38+
39+
total_sent += 3;
40+
}
41+
42+
/*
43+
* finished sending messages, now start the receiving thread.
44+
* keep in mind both threads can be running at the same time,
45+
* but in this example we wish to see the queue accumulate some
46+
* messages before the receiver thread starts reading them out.
47+
*/
48+
k_thread_start(receiving_thread);
49+
}
50+
51+
void consumer_function(void *p1, void *p2, void *p3){s
52+
char received[BUF_SIZE];
53+
54+
for(int i = 0; i < (BUF_SIZE - 1); i++){
55+
k_msgq_get(&my_msgq, &received[i], K_NO_WAIT);
56+
}
57+
58+
received[BUF_SIZE - 1] = '\0';
59+
/* we expect to see CBA012345... */
60+
printk("[consumer] got sequence: %s\n", received);
61+
}
62+
63+
K_THREAD_DEFINE(consumer_thread, 2048, consumer_function, NULL, NULL, NULL, PRIORITY, 0, INACTIVE);
64+
K_THREAD_DEFINE(producer_thread, 2048, producer_function, ((void *) consumer_thread), NULL, NULL, PRIORITY, 0, 0);

0 commit comments

Comments
 (0)