Skip to content

Commit d9bd69b

Browse files
committed
refactor: add Emitter
1 parent e919d10 commit d9bd69b

File tree

4 files changed

+87
-65
lines changed

4 files changed

+87
-65
lines changed

scripts/dgram_memory.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ module.exports = {
5050
if (module === require.main) {
5151
setInterval(() => {
5252
sendSomeBufs().catch((err) => {
53-
console.error(err);
53+
console.error('receive error', err);
5454
});
5555
if (global.gc) {
5656
global.gc()

src/dgram.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use napi::{Env, JsBuffer, JsFunction, JsNumber, JsObject, JsString, JsUnknown, R
88
use nix::{self, errno::errno};
99
use uv_sys::sys::{self, uv_poll_event};
1010

11-
use crate::socket::{close, get_loop, sockaddr_from_string};
11+
use crate::socket::{close, get_loop, sockaddr_from_string, Emitter};
1212
use crate::util::{
1313
addr_to_string, buf_into_vec, check_emit, error, get_err, i8_slice_into_u8_slice,
1414
resolve_libc_err, resolve_uv_err, set_clo_exec, set_non_block, socket_addr_to_string,
@@ -133,7 +133,7 @@ pub struct DgramSocketWrap {
133133
env: Env,
134134
handle: *mut sys::uv_poll_t,
135135
msg_queue: LinkedList<MsgItem>,
136-
emit_ref: Ref<()>,
136+
emitter: Emitter,
137137
}
138138

139139
impl DgramSocketWrap {
@@ -151,7 +151,6 @@ impl DgramSocketWrap {
151151
set_clo_exec(fd)?;
152152

153153
let emit_fn: JsFunction = this.get_named_property("emit")?;
154-
let emit_ref = env.create_reference(emit_fn)?;
155154
let handle = Box::into_raw(Box::new(unsafe {
156155
mem::MaybeUninit::<sys::uv_poll_t>::zeroed().assume_init()
157156
}));
@@ -160,7 +159,7 @@ impl DgramSocketWrap {
160159
handle,
161160
msg_queue: LinkedList::new(),
162161
env,
163-
emit_ref,
162+
emitter: Emitter::new(env, emit_fn)?,
164163
};
165164

166165
env.wrap(&mut this, socket)?;
@@ -173,19 +172,6 @@ impl DgramSocketWrap {
173172
Ok(())
174173
}
175174

176-
// TODO DRY
177-
fn emit(&mut self, args: &[JsUnknown]) -> Result<()> {
178-
let env = self.env;
179-
180-
env.run_in_scope(|| {
181-
let emit: JsFunction = env.get_reference_value(&self.emit_ref)?;
182-
emit.call(None, args).unwrap();
183-
Ok(())
184-
})?;
185-
186-
Ok(())
187-
}
188-
189175
fn emit_error(&mut self, error: napi::Error) {
190176
let env = self.env;
191177

@@ -194,7 +180,7 @@ impl DgramSocketWrap {
194180
let event = env.create_string("_error").unwrap();
195181
let error = self.env.create_error(error).unwrap();
196182
self
197-
.emit(&[event.into_unknown(), error.into_unknown()])
183+
.emitter.emit(&[event.into_unknown(), error.into_unknown()])
198184
.unwrap();
199185
Ok(())
200186
})
@@ -407,8 +393,6 @@ impl DgramSocketWrap {
407393
};
408394

409395
// release Ref<JsFunction> in msg_queue
410-
self.emit_ref.unref(env)?;
411-
412396
loop {
413397
let msg = self.msg_queue.pop_front();
414398
if msg.is_none() {
@@ -428,7 +412,8 @@ impl DgramSocketWrap {
428412
close(self.fd)?;
429413

430414
let event = env.create_string("close")?;
431-
self.emit(&[event.into_unknown()])?;
415+
self.emitter.emit(&[event.into_unknown()])?;
416+
self.emitter.unref()?;
432417

433418
Ok(())
434419
}
@@ -492,7 +477,7 @@ pub fn on_readable(s: &mut DgramSocketWrap) -> Result<()> {
492477
args.push(js_sockname.into_unknown());
493478
}
494479

495-
let _ = s.emit(&args).map_err(|e| {
480+
let _ = s.emitter.emit(&args).map_err(|e| {
496481
let _ = env.throw_error(&e.reason, None);
497482
});
498483
}

src/seqpacket.rs

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ use std::collections::LinkedList;
22
use std::mem;
33
use std::os::raw::c_int;
44

5-
use crate::socket::{self, get_loop, sockaddr_from_string};
5+
use crate::socket::{self, get_loop, sockaddr_from_string, Emitter};
66
use crate::util::{
7-
addr_to_string, buf_into_vec, error, get_err, resolve_libc_err, resolve_uv_err, set_clo_exec,
8-
set_non_block, socket_addr_to_string, uv_err_msg, check_emit,
7+
addr_to_string, buf_into_vec, check_emit, error, get_err, resolve_libc_err, resolve_uv_err,
8+
set_clo_exec, set_non_block, socket_addr_to_string, uv_err_msg,
99
};
1010
use libc::{sockaddr, sockaddr_un, EAGAIN, EINTR, EINVAL, ENOBUFS, EWOULDBLOCK};
1111
use napi::{Env, JsBuffer, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result};
@@ -42,7 +42,7 @@ struct SeqpacketSocketWrap {
4242
read_buf_size: usize,
4343
state: State,
4444
poll_events: i32,
45-
emit_ref: Ref<()>,
45+
emitter: Emitter,
4646
}
4747

4848
fn unwrap<'a>(env: &'a Env, this: &JsObject) -> Result<&'a mut SeqpacketSocketWrap> {
@@ -146,8 +146,6 @@ impl SeqpacketSocketWrap {
146146
set_clo_exec(fd)?;
147147

148148
let emit_fn = this.get_named_property::<JsFunction>("emit")?;
149-
let emit_ref = env.create_reference(emit_fn)?;
150-
151149
let handle = Box::into_raw(Box::new(unsafe {
152150
mem::MaybeUninit::<sys::uv_poll_t>::zeroed().assume_init()
153151
}));
@@ -157,7 +155,7 @@ impl SeqpacketSocketWrap {
157155
let wrap = Self {
158156
// this,
159157
fd,
160-
emit_ref,
158+
emitter: Emitter::new(env, emit_fn)?,
161159
env,
162160
handle,
163161
msg_queue: LinkedList::new(),
@@ -203,21 +201,18 @@ impl SeqpacketSocketWrap {
203201
}
204202

205203
// release js objects
206-
self.emit_ref.unref(env)?;
207-
208204
socket::close(self.fd)?;
209-
210205
self.state = State::Closed;
211-
212-
self.emit_event("close")?;
206+
self.emitter.emit_event("close")?;
207+
self.emitter.unref()?;
213208

214209
Ok(())
215210
}
216211

217212
fn shutdown_write(&mut self) -> Result<()> {
218213
resolve_libc_err(unsafe { libc::shutdown(self.fd, libc::SHUT_WR) })?;
219214
self.state = State::ShutDown;
220-
self.emit_event("_shutdown")?;
215+
self.emitter.emit_event("_shutdown")?;
221216
Ok(())
222217
}
223218

@@ -230,37 +225,14 @@ impl SeqpacketSocketWrap {
230225
let event = env.create_string("_error").unwrap();
231226
let error = self.env.create_error(error).unwrap();
232227
self
228+
.emitter
233229
.emit(&[event.into_unknown(), error.into_unknown()])
234230
.unwrap();
235231
Ok(())
236232
})
237233
.unwrap();
238234
}
239235

240-
fn emit_event(&mut self, event: &str) -> Result<()> {
241-
let env = self.env;
242-
env.run_in_scope(|| {
243-
let js_event = env.create_string(event)?;
244-
let mut args: Vec<JsUnknown> = vec![];
245-
args.push(js_event.into_unknown());
246-
247-
self.emit(&args)
248-
})?;
249-
Ok(())
250-
}
251-
252-
fn emit(&mut self, args: &[JsUnknown]) -> Result<()> {
253-
let env = self.env;
254-
255-
env.run_in_scope(|| {
256-
let emit: JsFunction = env.get_reference_value(&self.emit_ref)?;
257-
emit.call(None, args)?;
258-
Ok(())
259-
})?;
260-
261-
Ok(())
262-
}
263-
264236
fn bind(&self, bindpath: &str) -> Result<()> {
265237
unsafe {
266238
let (sockaddr, _) = sockaddr_from_string(bindpath)?;
@@ -280,7 +252,7 @@ impl SeqpacketSocketWrap {
280252
}
281253

282254
// FIXME: error ignored
283-
let _ = self.emit_event("_connect");
255+
let _ = self.emitter.emit_event("_connect");
284256
}
285257

286258
fn handle_socket(&mut self, status: i32, _events: i32) {
@@ -314,7 +286,7 @@ impl SeqpacketSocketWrap {
314286
args.push(js_fd.into_unknown());
315287
let js_addr = env.create_string(&addr)?;
316288
args.push(js_addr.into_unknown());
317-
self.emit(&args)?;
289+
self.emitter.emit(&args)?;
318290
Ok(())
319291
}) {
320292
Ok(_) => {}
@@ -475,7 +447,7 @@ impl SeqpacketSocketWrap {
475447
args.push(js_event.into_unknown());
476448
let js_buf = env.create_buffer_with_data(buf[0..size].to_vec())?;
477449
args.push(js_buf.into_unknown());
478-
self.emit(&args)?;
450+
self.emitter.emit(&args)?;
479451
Ok(())
480452
})?;
481453

@@ -629,7 +601,7 @@ impl SeqpacketSocketWrap {
629601
Some(cb) => {
630602
let cb_ref = env.create_reference(cb)?;
631603
Some(cb_ref)
632-
},
604+
}
633605
None => None,
634606
},
635607
});

src/socket.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::mem;
22
use std::{str::FromStr};
33

44
use libc::{sockaddr_un};
5-
use napi::{Result, Env};
5+
use napi::{Result, Env, Ref, JsFunction, JsUnknown};
66
use uv_sys::sys;
77
use crate::util::{get_err, error};
88

@@ -42,3 +42,68 @@ pub(crate) unsafe fn sockaddr_from_string(bytes: &str) -> Result<(sockaddr_un, u
4242

4343
Ok((sockaddr, mem::size_of::<sockaddr_un>()))
4444
}
45+
46+
47+
pub(crate) struct Emitter {
48+
env: Env,
49+
emit_ref: Option<Ref<()>>,
50+
}
51+
52+
impl Emitter {
53+
pub fn new(env: Env, emit: JsFunction) -> Result<Self> {
54+
let emit_ref = env.create_reference(emit)?;
55+
56+
Ok(Self {
57+
env,
58+
emit_ref: Some(emit_ref),
59+
})
60+
}
61+
62+
pub fn unref(&mut self) -> Result<()> {
63+
let mut emit_ref = self.emit_ref.take();
64+
65+
match emit_ref.as_mut() {
66+
None => (),
67+
Some(emit_ref) => {
68+
emit_ref.unref(self.env)?;
69+
}
70+
}
71+
72+
Ok(())
73+
}
74+
75+
fn check_ref(&self) -> Result<()> {
76+
if self.emit_ref.is_none() {
77+
return Err(error("emitter already unreferenced".to_string()));
78+
}
79+
80+
Ok(())
81+
}
82+
83+
pub fn emit(&mut self, args: &[JsUnknown]) -> Result<()> {
84+
self.check_ref()?;
85+
86+
let env = self.env;
87+
88+
env.run_in_scope(|| {
89+
let emit_ref = self.emit_ref.as_mut().unwrap();
90+
let emit: JsFunction = env.get_reference_value(emit_ref)?;
91+
emit.call(None, args)?;
92+
Ok(())
93+
})?;
94+
95+
Ok(())
96+
}
97+
98+
pub fn emit_event(&mut self, event: &str) -> Result<()> {
99+
let env = self.env;
100+
env.run_in_scope(|| {
101+
let js_event = env.create_string(event)?;
102+
let mut args: Vec<JsUnknown> = vec![];
103+
args.push(js_event.into_unknown());
104+
105+
self.emit(&args)
106+
})?;
107+
Ok(())
108+
}
109+
}

0 commit comments

Comments
 (0)