Skip to content

Commit 9e3048a

Browse files
committed
wip: unfinished web example
1 parent 549491f commit 9e3048a

File tree

6 files changed

+580
-376
lines changed

6 files changed

+580
-376
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ members = [
99
"examples/harder",
1010
"examples/hardest",
1111
"examples/read",
12+
"examples/web",
1213
"examples/write",
1314
"examples/cluster_node"
1415
]

examples/web/Cargo.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "web"
3+
version = "0.1.0"
4+
edition = "2021"
5+
license = "BSD-2-Clause"
6+
7+
[dependencies.tarantool]
8+
path = "../../tarantool"
9+
10+
[dependencies]
11+
async-trait = "*"
12+
tide = "*"
13+
log = "=0.4.16"
14+
mio = { version = "0.8", features = ["os-poll", "net"] }
15+
16+
[lib]
17+
crate-type = ["cdylib"]

examples/web/src/lib.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
//! tide is useless, it's just an abstraction over async_h1
2+
//! can we use single-threaded tokio in a separate fiber? whose going to be
3+
//! wakign it up?
4+
use tarantool::{
5+
coio::coio_wait,
6+
ffi::tarantool::{CoIOFlags, TIMEOUT_INFINITY},
7+
fiber::{self, future::Executor},
8+
util::IntoClones,
9+
r#async::poll_fn,
10+
};
11+
12+
use std::{
13+
collections::HashMap,
14+
cell::RefCell,
15+
fmt,
16+
io,
17+
os::unix::io::AsRawFd,
18+
rc::Rc,
19+
time::{Instant, Duration},
20+
task::Poll,
21+
};
22+
23+
use mio::{
24+
net,
25+
Token,
26+
};
27+
28+
use tide::{Request, listener::{Listener, ToListener}};
29+
30+
#[derive(Debug)]
31+
struct TcpListener {
32+
server: Option<tide::Server<()>>,
33+
addr: std::net::SocketAddr,
34+
listener: Option<net::TcpListener>,
35+
poll: Rc<RefCell<mio::Poll>>,
36+
token: Token,
37+
evs: Rc<RefCell<HashMap<Token, mio::event::Event>>>,
38+
}
39+
40+
impl TcpListener {
41+
fn new(
42+
addr: &str,
43+
poll: Rc<RefCell<mio::Poll>>,
44+
token: Token,
45+
evs: Rc<RefCell<HashMap<Token, mio::event::Event>>>,
46+
) -> Self {
47+
Self {
48+
server: None,
49+
addr: addr.parse().unwrap(),
50+
listener: None,
51+
poll,
52+
token,
53+
evs,
54+
}
55+
}
56+
}
57+
58+
unsafe impl Send for TcpListener {}
59+
unsafe impl Sync for TcpListener {}
60+
61+
impl fmt::Display for TcpListener {
62+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63+
f.write_str("fuck you")
64+
}
65+
}
66+
67+
impl ToListener<()> for TcpListener {
68+
type Listener = Self;
69+
70+
fn to_listener(self) -> io::Result<Self::Listener> {
71+
Ok(self)
72+
}
73+
}
74+
75+
#[async_trait::async_trait]
76+
impl Listener<()> for TcpListener {
77+
async fn bind(&mut self, server: tide::Server<()>) -> io::Result<()> {
78+
self.server = Some(server);
79+
// TODO: non blocking
80+
self.listener = Some(net::TcpListener::bind(self.addr)?);
81+
self.poll.borrow_mut().registry()
82+
.register(self.listener.as_mut().unwrap(), self.token, mio::Interest::READABLE)?;
83+
Ok(())
84+
}
85+
86+
async fn accept(&mut self) -> io::Result<()> {
87+
let new_conn = poll_fn(|ctx| {
88+
let maybe_event = self.evs.borrow_mut().remove(&self.token);
89+
if let Some(event) = maybe_event {
90+
match self.listener.as_ref().unwrap().accept() {
91+
Ok((new_conn, _)) => Poll::Ready(Ok(new_conn)),
92+
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
93+
self.evs.borrow_mut().insert(self.token, event);
94+
Poll::Pending
95+
}
96+
Err(e) => Poll::Ready(Err(e)),
97+
}
98+
} else {
99+
Poll::Pending
100+
}
101+
}).await;
102+
}
103+
104+
fn info(&self) -> Vec<tide::listener::ListenInfo> {
105+
todo!()
106+
}
107+
}
108+
109+
#[tarantool::proc]
110+
fn web() {
111+
let (exe1, exe2) = Rc::new(Executor::new()).into_clones();
112+
113+
let executor_jh = fiber::defer(|| {
114+
let mut last_awoke = Instant::now();
115+
while exe1.has_tasks() {
116+
exe1.do_loop();
117+
eprintln!("slept for {:?}", last_awoke.elapsed());
118+
last_awoke = Instant::now();
119+
}
120+
});
121+
122+
let (ev, ev2) = Rc::new(RefCell::new(HashMap::new())).into_clones();
123+
let (poll, poll2) = Rc::new(RefCell::new(mio::Poll::new().unwrap())).into_clones();
124+
let io_cond = exe1.cond.clone();
125+
126+
let io_jh = fiber::defer(|| {
127+
let mut events = mio::Events::with_capacity(1024);
128+
loop {
129+
poll.borrow_mut().poll(&mut events, Some(Duration::ZERO)).unwrap();
130+
// TODO: process events
131+
if !events.is_empty() {
132+
io_cond.signal()
133+
}
134+
for e in &events {
135+
match ev.borrow_mut().entry(e.token()) {
136+
std::collections::hash_map::Entry::Occupied(_) => {
137+
panic!("previous event wasn't handled: {:?}", e.token())
138+
}
139+
std::collections::hash_map::Entry::Vacant(kv) => {
140+
kv.insert(e.clone());
141+
}
142+
}
143+
}
144+
let fd = poll.borrow().as_raw_fd();
145+
coio_wait(
146+
fd,
147+
CoIOFlags::READ,
148+
unsafe { TIMEOUT_INFINITY },
149+
).unwrap();
150+
}
151+
});
152+
153+
exe1.block_on(async move {
154+
let mut app = tide::new();
155+
156+
app.at("/").get(|request: Request<_>| async move {
157+
Ok(format!(
158+
"Hi! You reached this app through: {}",
159+
request.local_addr().unwrap_or("an unknown port")
160+
))
161+
});
162+
163+
let l = TcpListener::new("localhost:42069", poll2, Token(42069), ev2);
164+
app.listen(l).await.unwrap();
165+
});
166+
167+
executor_jh.join();
168+
std::mem::forget(io_jh);
169+
}

tarantool/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ thiserror = "1.0.30"
2626
ipnetwork = { version = "0.18.0", optional = true }
2727
lazy_static = { version = "1.4", optional = true }
2828
libc = "0.2"
29-
log = "0.4"
29+
log = "=0.4.16"
30+
slog-stdlog = "4.1.1"
3031
num-traits = "0.2"
3132
num-derive = "0.3"
3233
tlua = { path = "../tlua", version = "0.6.1" }

0 commit comments

Comments
 (0)