Skip to content

Commit a1d15f2

Browse files
authored
minimal support of System type with io-uring (#395)
1 parent 70ea532 commit a1d15f2

File tree

5 files changed

+170
-92
lines changed

5 files changed

+170
-92
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,7 @@ jobs:
112112
- name: tests
113113
if: matrix.target.os == 'ubuntu-latest'
114114
run: |
115-
cargo ci-test
116-
cargo ci-test-rt-linux
117-
cargo ci-test-server-linux
115+
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
118116
119117
- name: Clear the cargo caches
120118
run: |
@@ -141,7 +139,8 @@ jobs:
141139
args: cargo-hack
142140

143141
- name: tests
144-
run: cargo ci-test-lower-msrv
142+
run: |
143+
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
145144
146145
- name: Clear the cargo caches
147146
run: |

actix-rt/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! blocking task thread-pool using [`task::spawn_blocking`].
1616
//!
1717
//! # Examples
18-
//! ```
18+
//! ```no_run
1919
//! use std::sync::mpsc;
2020
//! use actix_rt::{Arbiter, System};
2121
//!

actix-rt/src/system.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
use futures_core::ready;
1212
use tokio::sync::{mpsc, oneshot};
1313

14-
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
14+
use crate::{arbiter::ArbiterHandle, Arbiter};
1515

1616
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
1717

@@ -29,6 +29,7 @@ pub struct System {
2929
arbiter_handle: ArbiterHandle,
3030
}
3131

32+
#[cfg(not(feature = "io-uring"))]
3233
impl System {
3334
/// Create a new system.
3435
///
@@ -37,7 +38,7 @@ impl System {
3738
#[allow(clippy::new_ret_no_self)]
3839
pub fn new() -> SystemRunner {
3940
Self::with_tokio_rt(|| {
40-
default_tokio_runtime()
41+
crate::runtime::default_tokio_runtime()
4142
.expect("Default Actix (Tokio) runtime could not be created.")
4243
})
4344
}
@@ -53,7 +54,7 @@ impl System {
5354
let (stop_tx, stop_rx) = oneshot::channel();
5455
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
5556

56-
let rt = Runtime::from(runtime_factory());
57+
let rt = crate::runtime::Runtime::from(runtime_factory());
5758
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
5859
let system = System::construct(sys_tx, sys_arbiter.clone());
5960

@@ -72,7 +73,32 @@ impl System {
7273
system,
7374
}
7475
}
76+
}
7577

78+
#[cfg(feature = "io-uring")]
79+
impl System {
80+
/// Create a new system.
81+
///
82+
/// # Panics
83+
/// Panics if underlying Tokio runtime can not be created.
84+
#[allow(clippy::new_ret_no_self)]
85+
pub fn new() -> SystemRunner {
86+
SystemRunner
87+
}
88+
89+
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
90+
///
91+
/// [tokio-runtime]: tokio::runtime::Runtime
92+
#[doc(hidden)]
93+
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
94+
where
95+
F: Fn() -> tokio::runtime::Runtime,
96+
{
97+
unimplemented!("System::with_tokio_rt is not implemented yet")
98+
}
99+
}
100+
101+
impl System {
76102
/// Constructs new system and registers it on the current thread.
77103
pub(crate) fn construct(
78104
sys_tx: mpsc::UnboundedSender<SystemCommand>,
@@ -149,16 +175,18 @@ impl System {
149175
}
150176
}
151177

178+
#[cfg(not(feature = "io-uring"))]
152179
/// Runner that keeps a [System]'s event loop alive until stop message is received.
153180
#[must_use = "A SystemRunner does nothing unless `run` is called."]
154181
#[derive(Debug)]
155182
pub struct SystemRunner {
156-
rt: Runtime,
183+
rt: crate::runtime::Runtime,
157184
stop_rx: oneshot::Receiver<i32>,
158185
#[allow(dead_code)]
159186
system: System,
160187
}
161188

189+
#[cfg(not(feature = "io-uring"))]
162190
impl SystemRunner {
163191
/// Starts event loop and will return once [System] is [stopped](System::stop).
164192
pub fn run(self) -> io::Result<()> {
@@ -188,6 +216,45 @@ impl SystemRunner {
188216
}
189217
}
190218

219+
#[cfg(feature = "io-uring")]
220+
/// Runner that keeps a [System]'s event loop alive until stop message is received.
221+
#[must_use = "A SystemRunner does nothing unless `run` is called."]
222+
#[derive(Debug)]
223+
pub struct SystemRunner;
224+
225+
#[cfg(feature = "io-uring")]
226+
impl SystemRunner {
227+
/// Starts event loop and will return once [System] is [stopped](System::stop).
228+
pub fn run(self) -> io::Result<()> {
229+
unimplemented!("SystemRunner::run is not implemented yet")
230+
}
231+
232+
/// Runs the provided future, blocking the current thread until the future completes.
233+
#[inline]
234+
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
235+
tokio_uring::start(async move {
236+
let (stop_tx, stop_rx) = oneshot::channel();
237+
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
238+
239+
let sys_arbiter = Arbiter::in_new_system();
240+
let system = System::construct(sys_tx, sys_arbiter.clone());
241+
242+
system
243+
.tx()
244+
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
245+
.unwrap();
246+
247+
// init background system arbiter
248+
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
249+
tokio_uring::spawn(sys_ctrl);
250+
251+
let res = fut.await;
252+
drop(stop_rx);
253+
res
254+
})
255+
}
256+
}
257+
191258
#[derive(Debug)]
192259
pub(crate) enum SystemCommand {
193260
Exit(i32),

actix-rt/tests/tests.rs

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::{
22
future::Future,
3-
sync::mpsc::channel,
4-
thread,
53
time::{Duration, Instant},
64
};
75

86
use actix_rt::{task::JoinError, Arbiter, System};
9-
use tokio::sync::oneshot;
7+
8+
#[cfg(not(feature = "io-uring"))]
9+
use {
10+
std::{sync::mpsc::channel, thread},
11+
tokio::sync::oneshot,
12+
};
1013

1114
#[test]
1215
fn await_for_timer() {
@@ -103,6 +106,10 @@ fn wait_for_spawns() {
103106
assert!(rt.block_on(handle).is_err());
104107
}
105108

109+
// Temporary disabled tests for io-uring feature.
110+
// They should be enabled when possible.
111+
112+
#[cfg(not(feature = "io-uring"))]
106113
#[test]
107114
fn arbiter_spawn_fn_runs() {
108115
let _ = System::new();
@@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
119126
arbiter.join().unwrap();
120127
}
121128

129+
#[cfg(not(feature = "io-uring"))]
122130
#[test]
123131
fn arbiter_handle_spawn_fn_runs() {
124132
let sys = System::new();
@@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
141149
sys.run().unwrap();
142150
}
143151

152+
#[cfg(not(feature = "io-uring"))]
144153
#[test]
145154
fn arbiter_drop_no_panic_fn() {
146155
let _ = System::new();
@@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
152161
arbiter.join().unwrap();
153162
}
154163

164+
#[cfg(not(feature = "io-uring"))]
155165
#[test]
156166
fn arbiter_drop_no_panic_fut() {
157167
let _ = System::new();
@@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
163173
arbiter.join().unwrap();
164174
}
165175

166-
#[test]
167-
#[should_panic]
168-
fn no_system_current_panic() {
169-
System::current();
170-
}
171-
172-
#[test]
173-
#[should_panic]
174-
fn no_system_arbiter_new_panic() {
175-
Arbiter::new();
176-
}
177-
176+
#[cfg(not(feature = "io-uring"))]
178177
#[test]
179178
fn system_arbiter_spawn() {
180179
let runner = System::new();
@@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
205204
thread.join().unwrap();
206205
}
207206

207+
#[cfg(not(feature = "io-uring"))]
208208
#[test]
209209
fn system_stop_stops_arbiters() {
210210
let sys = System::new();
@@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
293293
assert!(!counter.load(Ordering::SeqCst));
294294
}
295295

296+
#[test]
297+
#[should_panic]
298+
fn no_system_current_panic() {
299+
System::current();
300+
}
301+
302+
#[test]
303+
#[should_panic]
304+
fn no_system_arbiter_new_panic() {
305+
Arbiter::new();
306+
}
307+
296308
#[test]
297309
fn try_current_no_system() {
298310
assert!(System::try_current().is_none())
@@ -330,28 +342,27 @@ fn spawn_local() {
330342
#[cfg(all(target_os = "linux", feature = "io-uring"))]
331343
#[test]
332344
fn tokio_uring_arbiter() {
333-
let system = System::new();
334-
let (tx, rx) = std::sync::mpsc::channel();
335-
336-
Arbiter::new().spawn(async move {
337-
let handle = actix_rt::spawn(async move {
338-
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
339-
let buf = b"Hello World!";
345+
System::new().block_on(async {
346+
let (tx, rx) = std::sync::mpsc::channel();
340347

341-
let (res, _) = f.write_at(&buf[..], 0).await;
342-
assert!(res.is_ok());
348+
Arbiter::new().spawn(async move {
349+
let handle = actix_rt::spawn(async move {
350+
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
351+
let buf = b"Hello World!";
343352

344-
f.sync_all().await.unwrap();
345-
f.close().await.unwrap();
353+
let (res, _) = f.write_at(&buf[..], 0).await;
354+
assert!(res.is_ok());
346355

347-
std::fs::remove_file("test.txt").unwrap();
348-
});
356+
f.sync_all().await.unwrap();
357+
f.close().await.unwrap();
349358

350-
handle.await.unwrap();
351-
tx.send(true).unwrap();
352-
});
359+
std::fs::remove_file("test.txt").unwrap();
360+
});
353361

354-
assert!(rx.recv().unwrap());
362+
handle.await.unwrap();
363+
tx.send(true).unwrap();
364+
});
355365

356-
drop(system);
366+
assert!(rx.recv().unwrap());
367+
})
357368
}

0 commit comments

Comments
 (0)