Skip to content

Commit 85938e2

Browse files
author
Alexander pinheiro paschoaletto
committed
kernel: msgq: adding support for k_msgq_put_urgent and sample code
This commit introduces the k_msgq_put_urgent API for sending messages to a queue in a LIFO scheme. It also adds a simple sample code that demonstrates how it works. Signed-off-by: Alexander pinheiro paschoaletto <axelpinheiro@gmail.com>
1 parent 808ee17 commit 85938e2

File tree

7 files changed

+268
-1
lines changed

7 files changed

+268
-1
lines changed

include/zephyr/kernel.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4785,7 +4785,7 @@ __syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
47854785
int k_msgq_cleanup(struct k_msgq *msgq);
47864786

47874787
/**
4788-
* @brief Send a message to a message queue.
4788+
* @brief Send a message to the end of a message queue.
47894789
*
47904790
* This routine sends a message to message queue @a q.
47914791
*
@@ -4806,6 +4806,30 @@ int k_msgq_cleanup(struct k_msgq *msgq);
48064806
*/
48074807
__syscall int k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout);
48084808

4809+
/**
4810+
* @brief Send a message to the beginning of a message queue.
4811+
*
4812+
* This routine sends a message to the beginning (head) of message queue @a q.
4813+
* Messages sent with this method will be retrieved before any pre-existing
4814+
* messages in the queue.
4815+
*
4816+
* @note The message content is copied from @a data into @a msgq and the @a data
4817+
* pointer is not retained, so the message content will not be modified
4818+
* by this function.
4819+
*
4820+
* @funcprops \isr_ok
4821+
*
4822+
* @param msgq Address of the message queue.
4823+
* @param data Pointer to the message.
4824+
* @param timeout Waiting period to add the message, or one of the special
4825+
* values K_NO_WAIT and K_FOREVER.
4826+
*
4827+
* @retval 0 Message sent.
4828+
* @retval -ENOMSG Returned without waiting or queue purged.
4829+
* @retval -EAGAIN Waiting period timed out.
4830+
*/
4831+
__syscall int k_msgq_put_urgent(struct k_msgq *msgq, const void *data, k_timeout_t timeout);
4832+
48094833
/**
48104834
* @brief Receive a message from a message queue.
48114835
*

kernel/msg_q.c

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,100 @@ int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout
187187
return result;
188188
}
189189

190+
int z_impl_k_msgq_put_urgent(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
191+
{
192+
__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
193+
194+
struct k_thread *pending_thread;
195+
k_spinlock_key_t key;
196+
int result;
197+
198+
key = k_spin_lock(&msgq->lock);
199+
200+
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
201+
202+
if (msgq->used_msgs < msgq->max_msgs) {
203+
/* message queue isn't full */
204+
pending_thread = z_unpend_first_thread(&msgq->wait_q);
205+
if (unlikely(pending_thread != NULL)) {
206+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
207+
208+
/* give message to waiting thread */
209+
(void)memcpy(pending_thread->base.swap_data, data,
210+
msgq->msg_size);
211+
/* wake up waiting thread */
212+
arch_thread_return_value_set(pending_thread, 0);
213+
z_ready_thread(pending_thread);
214+
z_reschedule(&msgq->lock, key);
215+
return 0;
216+
} else {
217+
/*
218+
* urgent put = write message to the head of the queue.
219+
* this is achieved in practice by simply:
220+
*
221+
* 1. decrementing the read pointer. This effectively
222+
* opens space for the incoming message in the head of
223+
* the queue.
224+
*
225+
* 2. temporarily matching the read pointer with the
226+
* write pointer. This way the message will be written
227+
* in the recently opened space.
228+
*
229+
* 3. reverting the write pointer after the write to its
230+
* original value. The read pointer, by its turn, should
231+
* be always at the head, so it doesn't need reverting
232+
* because we just wrote to the head.
233+
*/
234+
char *original_write_ptr = msgq->write_ptr;
235+
236+
/* decrement the read pointer */
237+
msgq->read_ptr -= msgq->msg_size;
238+
if(msgq->read_ptr < msgq->buffer_start){
239+
msgq->read_ptr = msgq->buffer_end - msgq->msg_size;
240+
}
241+
242+
/* match read and write pointers */
243+
msgq->write_ptr = msgq->read_ptr;
244+
245+
/* put message in queue (like usual) */
246+
__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
247+
msgq->write_ptr < msgq->buffer_end);
248+
(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
249+
msgq->write_ptr += msgq->msg_size;
250+
if (msgq->write_ptr == msgq->buffer_end) {
251+
msgq->write_ptr = msgq->buffer_start;
252+
}
253+
msgq->used_msgs++;
254+
255+
/* revert write pointer to the original value */
256+
msgq->write_ptr = original_write_ptr;
257+
258+
#ifdef CONFIG_POLL
259+
handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
260+
#endif /* CONFIG_POLL */
261+
}
262+
result = 0;
263+
} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
264+
/* don't wait for message space to become available */
265+
result = -ENOMSG;
266+
} else {
267+
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
268+
269+
/* wait for put message success, failure, or timeout */
270+
_current->base.swap_data = (void *) data;
271+
272+
result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
273+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
274+
return result;
275+
}
276+
277+
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
278+
279+
z_reschedule(&msgq->lock, key);
280+
281+
return result;
282+
}
283+
190284
#ifdef CONFIG_USERSPACE
191285
static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
192286
k_timeout_t timeout)
@@ -196,6 +290,15 @@ static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
196290

197291
return z_impl_k_msgq_put(msgq, data, timeout);
198292
}
293+
294+
static inline int z_vrfy_k_msgq_put_urgent(struct k_msgq *msgq, const void *data,
295+
k_timeout_t timeout)
296+
{
297+
K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
298+
K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
299+
300+
return z_impl_k_msgq_put_urgent(msgq, data, timeout);
301+
}
199302
#include <zephyr/syscalls/k_msgq_put_mrsh.c>
200303
#endif /* CONFIG_USERSPACE */
201304

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
cmake_minimum_required(VERSION 3.20.0)
4+
5+
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
6+
project(msg_queue)
7+
8+
target_sources(app PRIVATE src/main.c)

samples/basic/msg_queue/README.rst

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
.. zephyr:code-sample:: msg_queue
2+
:name: Message Queue
3+
4+
Implements a basic message queue producer/consumer thread pair.
5+
6+
Overview
7+
********
8+
9+
A simple sample to demonstrate the basic usage of Zephyr message queues.
10+
a producer thread sends both normal and urgent messages.
11+
12+
Building and Running
13+
********************
14+
15+
This application can be built and executed on QEMU as follows:
16+
17+
.. zephyr-app-commands::
18+
:zephyr-app: samples/basic/msg_queue
19+
:host-os: unix
20+
:board: qemu_x86
21+
:goals: run
22+
:compact:
23+
24+
To build for another board, change "qemu_x86" above to that board's name.
25+
26+
Sample Output
27+
=============
28+
29+
Every normal message sending implies sending the message to the end of the
30+
queue, and the first message to go is the first to be delivered. Every "urgent"
31+
message, at its turn, implies sending the message to the beginning of the queue.
32+
33+
In this sample, one producer thread sends 1 urgent message for each 2 regular
34+
ones. Note that message C is the first retrieved because it was the last one
35+
sent as "urgent".
36+
37+
.. code-block:: console
38+
39+
[producer] sending: 0
40+
[producer] sending: 1
41+
[producer] sending: A (urgent)
42+
[producer] sending: 2
43+
[producer] sending: 3
44+
[producer] sending: B (urgent)
45+
[producer] sending: 4
46+
[producer] sending: 5
47+
[producer] sending: C (urgent)
48+
[consumer] got sequence: CBA012345
49+
50+
Exit QEMU by pressing :kbd:`CTRL+A` :kbd:`x`.

samples/basic/msg_queue/prj.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# nothing here

samples/basic/msg_queue/sample.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
sample:
2+
description: A message queue usage sample
3+
name: message queue sample
4+
common:
5+
tags:
6+
- message queues
7+
tests:
8+
sample.msgqueue:
9+
build_only: true
10+
platform_allow:
11+
- qemu_riscv32
12+
- qemu_cortex_a53
13+
integration_platforms:
14+
- qemu_riscv32
15+
tags:
16+
- message
17+
- queue

samples/basic/msg_queue/src/main.c

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2025 Instituto Superior de Engenharia do Porto (ISEP).
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include <zephyr/kernel.h>
8+
9+
#define BUF_SIZE 10
10+
#define INACTIVE -1
11+
#define PRIORITY 5
12+
13+
K_MSGQ_DEFINE(my_msgq, sizeof(char), BUF_SIZE, 1);
14+
15+
void producer_function(void *rec, void *p2, void *p3){
16+
17+
k_tid_t receiving_thread = (k_tid_t) rec;
18+
19+
char normal_data = '0';
20+
char urgent_data = 'A';
21+
int total_sent = 0;
22+
23+
/*
24+
* sends messages every 100 msec, in repeating
25+
* sequence normal, normal, urgent, ...
26+
*/
27+
while(total_sent < (BUF_SIZE - 1)){
28+
for(int i = 0; i < 2; i++){
29+
printk("[producer] sending: %c\n", normal_data);
30+
k_msgq_put(&my_msgq, &normal_data, K_NO_WAIT);
31+
k_sleep(K_MSEC(100));
32+
normal_data++;
33+
}
34+
printk("[producer] sending: %c (urgent)\n", urgent_data);
35+
k_msgq_put_urgent(&my_msgq, &urgent_data, K_NO_WAIT);
36+
k_sleep(K_MSEC(100));
37+
urgent_data++;
38+
39+
total_sent += 3;
40+
}
41+
42+
/*
43+
* finished sending messages, now start the receiving thread.
44+
* keep in mind both threads can be running at the same time,
45+
* but in this example we wish to see the queue accumulate some
46+
* messages before the receiver thread starts reading them out.
47+
*/
48+
k_thread_start(receiving_thread);
49+
}
50+
51+
void consumer_function(void *p1, void *p2, void *p3){
52+
char received[BUF_SIZE];
53+
54+
for(int i = 0; i < (BUF_SIZE - 1); i++){
55+
k_msgq_get(&my_msgq, &received[i], K_NO_WAIT);
56+
}
57+
58+
received[BUF_SIZE - 1] = '\0';
59+
/* we expect to see CBA012345... */
60+
printk("[consumer] got sequence: %s\n", received);
61+
}
62+
63+
K_THREAD_DEFINE(consumer_thread, 2048, consumer_function, NULL, NULL, NULL, PRIORITY, 0, INACTIVE);
64+
K_THREAD_DEFINE(producer_thread, 2048, producer_function, ((void *) consumer_thread), NULL, NULL, PRIORITY, 0, 0);

0 commit comments

Comments
 (0)