Skip to content

Commit e8f2a16

Browse files
authored
Merge pull request #272 from Sherlock-Holo/rawop-with-flag
feat: add flags related methods
2 parents 1dd5683 + a0f85b1 commit e8f2a16

File tree

8 files changed

+87
-19
lines changed

8 files changed

+87
-19
lines changed

compio-driver/src/iour/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ impl AsRawFd for Driver {
282282
}
283283
}
284284

285-
fn create_entry(entry: CEntry) -> Entry {
286-
let result = entry.result();
285+
fn create_entry(cq_entry: CEntry) -> Entry {
286+
let result = cq_entry.result();
287287
let result = if result < 0 {
288288
let result = if result == -libc::ECANCELED {
289289
libc::ETIMEDOUT
@@ -294,7 +294,10 @@ fn create_entry(entry: CEntry) -> Entry {
294294
} else {
295295
Ok(result as _)
296296
};
297-
Entry::new(entry.user_data() as _, result)
297+
let mut entry = Entry::new(cq_entry.user_data() as _, result);
298+
entry.set_flags(cq_entry.flags());
299+
300+
entry
298301
}
299302

300303
fn timespec(duration: std::time::Duration) -> Timespec {

compio-driver/src/key.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) struct RawOp<T: ?Sized> {
2121
// The metadata in `*mut RawOp<dyn OpCode>`
2222
metadata: usize,
2323
result: PushEntry<Option<Waker>, io::Result<usize>>,
24+
flags: u32,
2425
op: T,
2526
}
2627

@@ -84,6 +85,7 @@ impl<T: OpCode + 'static> Key<T> {
8485
cancelled: false,
8586
metadata: opcode_metadata::<T>(),
8687
result: PushEntry::Pending(None),
88+
flags: 0,
8789
op,
8890
});
8991
unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
@@ -154,6 +156,14 @@ impl<T: ?Sized> Key<T> {
154156
this.cancelled
155157
}
156158

159+
pub(crate) fn set_flags(&mut self, flags: u32) {
160+
self.as_opaque_mut().flags = flags;
161+
}
162+
163+
pub(crate) fn flags(&self) -> u32 {
164+
self.as_opaque().flags
165+
}
166+
157167
/// Whether the op is completed.
158168
pub(crate) fn has_result(&self) -> bool {
159169
self.as_opaque().result.is_ready()

compio-driver/src/lib.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ impl Proactor {
273273
}
274274

275275
/// Poll the driver and get completed entries.
276-
/// You need to call [`Proactor::pop`] to get the pushed operations.
276+
/// You need to call [`Proactor::pop`] to get the pushed
277+
/// operations.
277278
pub fn poll(
278279
&mut self,
279280
timeout: Option<Duration>,
@@ -290,11 +291,12 @@ impl Proactor {
290291
/// # Panics
291292
/// This function will panic if the requested operation has not been
292293
/// completed.
293-
pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
294+
pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
294295
instrument!(compio_log::Level::DEBUG, "pop", ?op);
295296
if op.has_result() {
297+
let flags = op.flags();
296298
// SAFETY: completed.
297-
PushEntry::Ready(unsafe { op.into_inner() })
299+
PushEntry::Ready((unsafe { op.into_inner() }, flags))
298300
} else {
299301
PushEntry::Pending(op)
300302
}
@@ -322,18 +324,33 @@ impl AsRawFd for Proactor {
322324
pub(crate) struct Entry {
323325
user_data: usize,
324326
result: io::Result<usize>,
327+
flags: u32,
325328
}
326329

327330
impl Entry {
328331
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
329-
Self { user_data, result }
332+
Self {
333+
user_data,
334+
result,
335+
flags: 0,
336+
}
337+
}
338+
339+
#[cfg(all(target_os = "linux", feature = "io-uring"))]
340+
// this method only used by in io-uring driver
341+
pub(crate) fn set_flags(&mut self, flags: u32) {
342+
self.flags = flags;
330343
}
331344

332345
/// The user-defined data returned by [`Proactor::push`].
333346
pub fn user_data(&self) -> usize {
334347
self.user_data
335348
}
336349

350+
pub fn flags(&self) -> u32 {
351+
self.flags
352+
}
353+
337354
/// The result of the operation.
338355
pub fn into_result(self) -> io::Result<usize> {
339356
self.result
@@ -357,6 +374,7 @@ impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, E> {
357374
self.entries.extend(iter.into_iter().filter_map(|e| {
358375
let user_data = e.user_data();
359376
let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
377+
op.set_flags(e.flags());
360378
if op.set_result(e.into_result()) {
361379
// SAFETY: completed and cancelled.
362380
let _ = unsafe { op.into_box() };

compio-driver/tests/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ fn push_and_wait<O: OpCode + 'static>(driver: &mut Proactor, op: O) -> BufResult
5353
driver.poll(None, &mut entries).unwrap();
5454
}
5555
assert_eq!(entries[0], user_data.user_data());
56-
driver.pop(user_data).take_ready().unwrap()
56+
driver.pop(user_data).take_ready().unwrap().0
5757
}
5858
}
5959
}

compio-runtime/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ pub mod time;
2222
pub use async_task::Task;
2323
pub use attacher::*;
2424
use compio_buf::BufResult;
25-
pub use runtime::{spawn, spawn_blocking, submit, JoinHandle, Runtime, RuntimeBuilder};
25+
pub use runtime::{
26+
spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder,
27+
};

compio-runtime/src/runtime/mod.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use send_wrapper::SendWrapper;
3131

3232
#[cfg(feature = "time")]
3333
use crate::runtime::time::{TimerFuture, TimerRuntime};
34-
use crate::{runtime::op::OpFuture, BufResult};
34+
use crate::{runtime::op::OpFlagsFuture, BufResult};
3535

3636
scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
3737

@@ -235,9 +235,26 @@ impl Runtime {
235235
///
236236
/// You only need this when authoring your own [`OpCode`].
237237
pub fn submit<T: OpCode + 'static>(&self, op: T) -> impl Future<Output = BufResult<usize, T>> {
238+
self.submit_with_flags(op).map(|(res, _)| res)
239+
}
240+
241+
/// Submit an operation to the runtime.
242+
///
243+
/// The difference between [`Runtime::submit`] is this method will return
244+
/// the flags
245+
///
246+
/// You only need this when authoring your own [`OpCode`].
247+
pub fn submit_with_flags<T: OpCode + 'static>(
248+
&self,
249+
op: T,
250+
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
238251
match self.submit_raw(op) {
239-
PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)),
240-
PushEntry::Ready(res) => Either::Right(ready(res)),
252+
PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)),
253+
PushEntry::Ready(res) => {
254+
// submit_flags won't be ready immediately, if ready, it must be error without
255+
// flags
256+
Either::Right(ready((res, 0)))
257+
}
241258
}
242259
}
243260

@@ -264,7 +281,7 @@ impl Runtime {
264281
&self,
265282
cx: &mut Context,
266283
op: Key<T>,
267-
) -> PushEntry<Key<T>, BufResult<usize, T>> {
284+
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
268285
instrument!(compio_log::Level::DEBUG, "poll_task", ?op);
269286
let mut driver = self.driver.borrow_mut();
270287
driver.pop(op).map_pending(|mut k| {
@@ -435,3 +452,16 @@ pub fn spawn_blocking<T: Send + 'static>(
435452
pub fn submit<T: OpCode + 'static>(op: T) -> impl Future<Output = BufResult<usize, T>> {
436453
Runtime::with_current(|r| r.submit(op))
437454
}
455+
456+
/// Submit an operation to the current runtime, and return a future for it with
457+
/// flags.
458+
///
459+
/// ## Panics
460+
///
461+
/// This method doesn't create runtime. It tries to obtain the current runtime
462+
/// by [`Runtime::with_current`].
463+
pub fn submit_with_flags<T: OpCode + 'static>(
464+
op: T,
465+
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
466+
Runtime::with_current(|r| r.submit_with_flags(op))
467+
}

compio-runtime/src/runtime/op.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ use compio_driver::{Key, OpCode, PushEntry};
1010
use crate::runtime::Runtime;
1111

1212
#[derive(Debug)]
13-
pub struct OpFuture<T: OpCode> {
13+
pub struct OpFlagsFuture<T: OpCode> {
1414
key: Option<Key<T>>,
1515
}
1616

17-
impl<T: OpCode> OpFuture<T> {
17+
impl<T: OpCode> OpFlagsFuture<T> {
1818
pub fn new(key: Key<T>) -> Self {
1919
Self { key: Some(key) }
2020
}
2121
}
2222

23-
impl<T: OpCode> Future for OpFuture<T> {
24-
type Output = BufResult<usize, T>;
23+
impl<T: OpCode> Future for OpFlagsFuture<T> {
24+
type Output = (BufResult<usize, T>, u32);
2525

2626
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2727
let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap()));
@@ -35,7 +35,7 @@ impl<T: OpCode> Future for OpFuture<T> {
3535
}
3636
}
3737

38-
impl<T: OpCode> Drop for OpFuture<T> {
38+
impl<T: OpCode> Drop for OpFlagsFuture<T> {
3939
fn drop(&mut self) {
4040
if let Some(key) = self.key.take() {
4141
Runtime::with_current(|r| r.cancel_op(key))

compio/examples/driver.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ fn push_and_wait<O: OpCode + 'static>(driver: &mut Proactor, op: O) -> (usize, O
5454
driver.poll(None, &mut entries).unwrap();
5555
}
5656
assert_eq!(entries[0], user_data.user_data());
57-
driver.pop(user_data).take_ready().unwrap().unwrap()
57+
driver
58+
.pop(user_data)
59+
.map_ready(|(res, _)| res)
60+
.take_ready()
61+
.unwrap()
62+
.unwrap()
5863
}
5964
}
6065
}

0 commit comments

Comments
 (0)