Skip to content

Commit a5b4ed0

Browse files
committed
feat: add createReuseportFd
1 parent 4a39bea commit a5b4ed0

File tree

8 files changed

+253
-18
lines changed

8 files changed

+253
-18
lines changed

README.md

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,6 @@
11
# unix-socket
22

3-
## TODO
43

5-
- support `socket.setSendBufferSize(size)`
6-
- current, server don't work with cluster module
4+
## TODO
75

8-
## Tmp
9-
- Connect, Accept
10-
- how to make connect async?
11-
- set nonblock and connect
12-
- wait POLLOUT?
13-
- Mode
14-
- all callbacks, a lot
15-
- event emitter instead
6+
- mark that server don't work with cluster module

__test__/seqpacket.spec.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,4 +502,8 @@ if (!kIsDarwin) {
502502
});
503503
});
504504
});
505+
} else {
506+
describe('seqpacket', () => {
507+
it('(tests skipped)', () => {})
508+
})
505509
}

__test__/socket.spec.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import * as net from 'net'
2+
import { createReuseportFd as createFd, closeFd } from '../js/index'
3+
import { hasIPv6 } from './util'
4+
5+
describe('tcp', () => {
6+
describe('createFd', () => {
7+
it('should work', async () => {
8+
const host = '0.0.0.0'
9+
let port = 0;
10+
11+
async function createServer() {
12+
const fd = createFd(port, host);
13+
14+
const server = await new Promise<net.Server>((resolve, reject) => {
15+
const server = net.createServer()
16+
17+
server.listen({
18+
fd,
19+
}, () => {
20+
resolve(server)
21+
})
22+
})
23+
24+
port = (server.address() as any).port
25+
26+
return server
27+
}
28+
29+
const servers = [];
30+
for (let i = 0; i < 5; i += 1) {
31+
const server = await createServer()
32+
servers.push(server);
33+
}
34+
35+
const pList = servers.map(server => {
36+
return new Promise((resolve, reject) => {
37+
server.once('connection', (socket) => {
38+
socket.on('data', buf => {
39+
resolve(buf)
40+
})
41+
})
42+
})
43+
})
44+
45+
const buf = Buffer.from('hello');
46+
const socket = net.connect(port, host, () => {
47+
socket.write(buf);
48+
});
49+
50+
const ret = await Promise.race(pList);
51+
expect(ret.toString()).toBe(buf.toString())
52+
53+
socket.destroy();
54+
55+
servers.forEach(server => server.close());
56+
})
57+
58+
if (hasIPv6()) {
59+
it('should work with ipv6', async () => {
60+
const host = '::1'
61+
let port = 0;
62+
63+
const fd = createFd(port, host);
64+
65+
const server = await new Promise<net.Server>((resolve, reject) => {
66+
const server = net.createServer()
67+
68+
server.listen({
69+
fd,
70+
}, () => {
71+
resolve(server)
72+
})
73+
})
74+
port = (server.address() as any).port
75+
const p = new Promise((resolve, reject) => {
76+
server.once('connection', (socket) => {
77+
socket.on('data', buf => {
78+
resolve(buf)
79+
})
80+
})
81+
})
82+
83+
const buf = Buffer.from('hello');
84+
const socket = net.connect(port, host, () => {
85+
socket.write(buf);
86+
});
87+
88+
const ret = await p;
89+
expect(ret.toString()).toBe(buf.toString())
90+
91+
socket.destroy();
92+
93+
server.close();
94+
});
95+
}
96+
})
97+
98+
describe('closeFd', () => {
99+
it('should work', async () => {
100+
const fd = createFd(0)
101+
102+
closeFd(fd)
103+
})
104+
});
105+
})

__test__/util.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,14 @@ export function createDefer<T>() {
3131
reject,
3232
}
3333
}
34+
35+
const isWindows = os.platform() === 'win32';
36+
37+
export function hasIPv6() {
38+
const iFaces = os.networkInterfaces();
39+
const re = isWindows ? /Loopback Pseudo-Interface/ : /lo/;
40+
return Object.keys(iFaces).some((name) => {
41+
return re.test(name) &&
42+
iFaces[name].some(({ family }) => family === 'IPv6');
43+
});
44+
}

js/addon.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ export function dgramGetSendBufferSize(ee: object): number
2424
export function dgramSetSendBufferSize(ee: object, size: number): void
2525
export function dgramSendTo(ee: object, buf: Buffer, offset: number, length: number, path: string, cb?: (...args: any[]) => any | undefined | null): void
2626
export function startRecv(ee: object): void
27+
export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number
28+
export function socketClose(fd: number): void

js/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export { SendCb, DgramSocket } from './dgram'
22
export { NotifyCb, SeqpacketSocket, SeqpacketServer } from './seqpacket'
3+
export { createReuseportFd, closeFd } from './socket'

js/socket.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { isIPv4, isIP } from 'net';
2+
import { socketNewSoReuseportFd, socketClose } from './addon'
3+
4+
export function createReuseportFd(port: number = 0, host: string = '0.0.0.0'): number {
5+
if (!isIP(host)) {
6+
throw new Error('invalid host');
7+
}
8+
9+
const domain = isIPv4(host) ? 'ipv4' : 'ipv6'
10+
11+
const fd = socketNewSoReuseportFd(domain, port, host);
12+
13+
return fd;
14+
}
15+
16+
export function closeFd(fd: number) {
17+
socketClose(fd)
18+
}

src/socket.rs

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1+
use std::ffi::CString;
12
use std::mem;
2-
use std::{str::FromStr};
3+
use std::str::FromStr;
34

4-
use libc::{sockaddr_un};
5-
use napi::{Result, Env, Ref, JsFunction, JsUnknown, JsObject};
5+
use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err};
6+
use libc::{c_void, sockaddr_storage, sockaddr_un};
7+
use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result};
68
use uv_sys::sys;
7-
use crate::util::{get_err, error};
89

9-
pub (crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> {
10+
pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> {
1011
Ok(env.get_uv_event_loop()? as *mut _ as *mut sys::uv_loop_t)
1112
}
1213

1314
pub(crate) fn close(fd: i32) -> Result<()> {
1415
let ret = unsafe { libc::close(fd) };
1516

17+
// TODO should we loop?
1618
if ret != 0 {
1719
if ret != libc::EINTR && ret != libc::EINPROGRESS {
1820
return Err(get_err());
@@ -53,7 +55,6 @@ pub(crate) fn sockaddr_from_string(bytes: &str) -> Result<(sockaddr_un, usize)>
5355
Ok((sockaddr, mem::size_of::<sockaddr_un>()))
5456
}
5557

56-
5758
pub(crate) struct Emitter {
5859
env: Env,
5960
emit_ref: Option<Ref<()>>,
@@ -124,7 +125,6 @@ impl Emitter {
124125
}
125126
}
126127

127-
128128
pub(crate) struct HandleData {
129129
env: Env,
130130
this_ref: Ref<()>,
@@ -156,3 +156,106 @@ impl HandleData {
156156
Ok(())
157157
}
158158
}
159+
160+
fn bind_socket(env: Env, fd: i32, domain: i32, port: JsNumber, ip: JsString) -> Result<JsNumber> {
161+
let mut on: i32 = 1;
162+
resolve_libc_err(unsafe {
163+
libc::setsockopt(
164+
fd,
165+
libc::SOL_SOCKET,
166+
libc::SO_REUSEADDR,
167+
&mut on as *mut _ as *mut c_void,
168+
mem::size_of::<i32>() as u32,
169+
)
170+
})?;
171+
172+
let mut on: i32 = 1;
173+
resolve_libc_err(unsafe {
174+
libc::setsockopt(
175+
fd,
176+
libc::SOL_SOCKET,
177+
libc::SO_REUSEPORT,
178+
&mut on as *mut _ as *mut c_void,
179+
mem::size_of::<i32>() as u32,
180+
)
181+
})?;
182+
183+
// parse ip port
184+
let ip = ip.into_utf8()?;
185+
let ip_str = CString::new(ip.as_str()?.to_string().into_bytes())?;
186+
let mut addr = unsafe { mem::MaybeUninit::<sockaddr_storage>::zeroed().assume_init() };
187+
let addr_len: u32;
188+
let port = port.get_int32()?;
189+
if domain == libc::AF_INET {
190+
resolve_uv_err(unsafe {
191+
sys::uv_ip4_addr(
192+
ip_str.as_c_str().as_ptr(),
193+
port,
194+
&mut addr as *mut _ as *mut sys::sockaddr_in,
195+
)
196+
})?;
197+
addr_len = mem::size_of::<sys::sockaddr_in>() as u32;
198+
} else {
199+
resolve_uv_err(unsafe {
200+
sys::uv_ip6_addr(
201+
ip_str.as_c_str().as_ptr(),
202+
port,
203+
&mut addr as *mut _ as *mut sys::sockaddr_in6,
204+
)
205+
})?;
206+
addr_len = mem::size_of::<sys::sockaddr_in6>() as u32;
207+
};
208+
209+
// bind socket
210+
resolve_libc_err(unsafe {
211+
libc::bind(
212+
fd,
213+
&mut addr as *mut _ as *mut libc::sockaddr,
214+
addr_len,
215+
)
216+
})?;
217+
218+
Ok(env.create_int32(fd)?)
219+
}
220+
221+
#[allow(dead_code)]
222+
#[napi]
223+
fn socket_new_so_reuseport_fd(
224+
env: Env,
225+
domain: JsString,
226+
port: JsNumber,
227+
ip: JsString,
228+
) -> Result<JsNumber> {
229+
let domain = domain.into_utf8()?;
230+
let s = domain.as_str()?;
231+
let domain = match s {
232+
"ipv4" => libc::AF_INET,
233+
"ipv6" => libc::AF_INET6,
234+
_ => {
235+
return Err(error(
236+
"unexpected domain paramter, expect 'ipv4' or 'ipv6'".to_string(),
237+
))
238+
}
239+
};
240+
241+
// create socket and set SO_REUSEPORT
242+
let fd = resolve_libc_err(unsafe { libc::socket(domain, libc::SOCK_STREAM, 0) })?;
243+
244+
let fd = match bind_socket(env, fd, domain, port, ip) {
245+
Ok(fd) => fd,
246+
Err(e) => {
247+
close(fd).unwrap();
248+
return Err(e)
249+
}
250+
};
251+
252+
Ok(fd)
253+
}
254+
255+
#[allow(dead_code)]
256+
#[napi]
257+
fn socket_close(fd: JsNumber) -> Result<()> {
258+
let fd = fd.get_int32()?;
259+
260+
close(fd)
261+
}

0 commit comments

Comments
 (0)