From dcd4198c1d3865b5968954bc3e3329a6862615e3 Mon Sep 17 00:00:00 2001 From: William Salmon Date: Mon, 14 Jul 2025 17:46:01 +0100 Subject: [PATCH] Add a timed send to match the timed receive --- CHANGELOG.md | 7 ++++++ src/mqueue.rs | 20 ++++++++++++++++ test/test_mq.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e320bc824..a1d8c2ff14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,13 @@ This project adheres to [Semantic Versioning](https://semver.org/). # Change Log +## [Unreleased] - ReleaseDate + +### Added + +- Added `mq_timedsend` to `::nix::mqueue`. + ([#2650](https://github.com/nix-rust/nix/pull/2650)) + ## [0.30.1] - 2025-05-04 ### Fixed diff --git a/src/mqueue.rs b/src/mqueue.rs index 7684d56858..9b8e754a3e 100644 --- a/src/mqueue.rs +++ b/src/mqueue.rs @@ -235,6 +235,26 @@ feature! { }; Errno::result(res).map(|r| r as usize) } + /// Send a message to a message queue with a timeout + /// + /// See also ['mq_timedsend(2)'](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_send.html) + pub fn mq_timedsend( + mqdes: &MqdT, + message: &[u8], + msg_prio: u32, + abstime: &TimeSpec, + ) -> Result<()> { + let res = unsafe { + libc::mq_timedsend( + mqdes.0, + message.as_ptr().cast(), + message.len(), + msg_prio, + abstime.as_ref(), + ) + }; + Errno::result(res).map(drop) + } } /// Send a message to a message queue diff --git a/test/test_mq.rs b/test/test_mq.rs index 874a72b44d..d34881dc70 100644 --- a/test/test_mq.rs +++ b/test/test_mq.rs @@ -4,6 +4,7 @@ use std::str; use nix::errno::Errno; use nix::mqueue::{ mq_attr_member_t, mq_close, mq_open, mq_receive, mq_send, mq_timedreceive, + mq_timedsend, mq_unlink, }; use nix::mqueue::{MQ_OFlag, MqAttr}; use nix::sys::stat::Mode; @@ -89,6 +90,68 @@ fn test_mq_timedreceive() { assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap()); } +#[test] +fn test_mq_timedsend() { + const MSG_SIZE: mq_attr_member_t = 32; + let attr = MqAttr::new(0, 10, MSG_SIZE, 0); + let mq_name = "/a_nix_test_queue_timedsend"; + + let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; + let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; + let r0 = mq_open(mq_name, oflag0, mode, Some(&attr)); + if let Err(Errno::ENOSYS) = r0 { + println!("message queues not supported or module not loaded?"); + return; + }; + let mqd0 = r0.unwrap(); + let msg_to_send = "msg_1"; + let abstime = + clock_gettime(ClockId::CLOCK_REALTIME).unwrap() + TimeSpec::seconds(1); + mq_timedsend(&mqd0, msg_to_send.as_bytes(), 1, &abstime).unwrap(); + + let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY; + let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap(); + let mut buf = [0u8; 32]; + let mut prio = 0u32; + let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap(); + assert_eq!(prio, 1); + + mq_close(mqd1).unwrap(); + mq_close(mqd0).unwrap(); + assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap()); + mq_unlink(mq_name).unwrap(); +} + +#[test] +fn test_mq_timedsend_full() { + const MSG_SIZE: mq_attr_member_t = 32; + let attr = MqAttr::new(0, 10, MSG_SIZE, 0); + let mq_name = "/a_nix_test_queue_fill"; + + let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY | MQ_OFlag::O_CLOEXEC; + let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; + let r0 = mq_open(mq_name, oflag0, mode, Some(&attr)); + if let Err(Errno::ENOSYS) = r0 { + println!("message queues not supported or module not loaded?"); + return; + }; + let mqd0 = r0.unwrap(); + let msg_to_send = "msg_1"; + for _i in 0..10 { + mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap(); + } + + let abstime = + clock_gettime(ClockId::CLOCK_REALTIME).unwrap() + TimeSpec::seconds(1); + assert_eq!( + mq_timedsend(&mqd0, msg_to_send.as_bytes(), 1, &abstime), + Err(Errno::ETIMEDOUT) + ); + + mq_close(mqd0).unwrap(); + mq_unlink(mq_name).unwrap(); +} + #[test] fn test_mq_getattr() { use nix::mqueue::mq_getattr;