Skip to content
This repository was archived by the owner on Jun 10, 2024. It is now read-only.

Commit 9ff19de

Browse files
committed
:erlang.link/1
Processes are linked and when either sides exits, it propagates an exit to the other side of the link.
1 parent ee5b8fe commit 9ff19de

File tree

6 files changed

+336
-48
lines changed

6 files changed

+336
-48
lines changed

liblumen_alloc/src/erts/process.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use core::sync::atomic::{AtomicU16, AtomicU64, AtomicUsize, Ordering};
1919

2020
use ::alloc::sync::Arc;
2121

22-
use hashbrown::HashMap;
22+
use hashbrown::{HashMap, HashSet};
2323
use intrusive_collections::{LinkedList, UnsafeRef};
2424

2525
use liblumen_core::locks::{Mutex, MutexGuard, RwLock, SpinLock};
@@ -100,6 +100,9 @@ pub struct ProcessControlBlock {
100100
code_stack: Mutex<code::stack::Stack>,
101101
pub status: RwLock<Status>,
102102
pub registered_name: RwLock<Option<Atom>>,
103+
/// Pids of processes that are linked to this process and need to be exited when this process
104+
/// exits
105+
pub linked_pid_set: Mutex<HashSet<Pid>>,
103106
pub mailbox: Mutex<RefCell<Mailbox>>,
104107
// process heap, cache line aligned to avoid false sharing with rest of struct
105108
heap: Mutex<ProcessHeap>,
@@ -140,6 +143,7 @@ impl ProcessControlBlock {
140143
run_reductions: Default::default(),
141144
total_reductions: Default::default(),
142145
registered_name: Default::default(),
146+
linked_pid_set: Default::default(),
143147
}
144148
}
145149

@@ -242,6 +246,8 @@ impl ProcessControlBlock {
242246
heap.virtual_alloc(bin)
243247
}
244248

249+
// Stack
250+
245251
/// Frees stack space occupied by the last term on the stack,
246252
/// adjusting the stack pointer accordingly.
247253
///
@@ -290,6 +296,23 @@ impl ProcessControlBlock {
290296
self.heap.lock().stack_used()
291297
}
292298

299+
// Links
300+
301+
pub fn link(&self, other: &ProcessControlBlock) {
302+
// link in order so that locks are always taken in the same order to prevent deadlocks
303+
if self.pid < other.pid {
304+
let mut self_pid_set = self.linked_pid_set.lock();
305+
let mut other_pid_set = other.linked_pid_set.lock();
306+
307+
self_pid_set.insert(other.pid);
308+
other_pid_set.insert(self.pid);
309+
} else {
310+
other.link(self)
311+
}
312+
}
313+
314+
// Pid
315+
293316
pub fn pid(&self) -> Pid {
294317
self.pid
295318
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// wasm32 proptest cannot be compiled at the same time as non-wasm32 proptest, so disable tests that
2+
// use proptest completely for wasm32
3+
//
4+
// See https://github.com/rust-lang/cargo/issues/4866
5+
#[cfg(all(not(target_arch = "wasm32"), test))]
6+
mod test;
7+
8+
use std::sync::Arc;
9+
10+
use liblumen_alloc::erts::exception;
11+
use liblumen_alloc::erts::exception::system::Alloc;
12+
use liblumen_alloc::erts::process::code::stack::frame::{Frame, Placement};
13+
use liblumen_alloc::erts::process::code::{self, result_from_exception};
14+
use liblumen_alloc::erts::process::ProcessControlBlock;
15+
use liblumen_alloc::erts::term::{atom_unchecked, Atom, Term, TypedTerm};
16+
use liblumen_alloc::{badarg, error, ModuleFunctionArity};
17+
18+
use crate::registry::pid_to_process;
19+
20+
pub fn place_frame_with_arguments(
21+
process: &ProcessControlBlock,
22+
placement: Placement,
23+
pid_or_port: Term,
24+
) -> Result<(), Alloc> {
25+
process.stack_push(pid_or_port)?;
26+
process.place_frame(frame(), placement);
27+
28+
Ok(())
29+
}
30+
31+
// Private
32+
33+
fn code(arc_process: &Arc<ProcessControlBlock>) -> code::Result {
34+
arc_process.reduce();
35+
36+
let pid_or_port = arc_process.stack_pop().unwrap();
37+
38+
match native(arc_process, pid_or_port) {
39+
Ok(true_term) => {
40+
arc_process.return_from_call(true_term)?;
41+
42+
ProcessControlBlock::call_code(arc_process)
43+
}
44+
Err(exception) => result_from_exception(arc_process, exception),
45+
}
46+
}
47+
48+
fn frame() -> Frame {
49+
Frame::new(module_function_arity(), code)
50+
}
51+
52+
fn function() -> Atom {
53+
Atom::try_from_str("link").unwrap()
54+
}
55+
56+
fn module_function_arity() -> Arc<ModuleFunctionArity> {
57+
Arc::new(ModuleFunctionArity {
58+
module: super::module(),
59+
function: function(),
60+
arity: 1,
61+
})
62+
}
63+
64+
fn native(process: &ProcessControlBlock, pid_or_port: Term) -> exception::Result {
65+
match pid_or_port.to_typed_term().unwrap() {
66+
TypedTerm::Pid(pid) => {
67+
if pid == process.pid() {
68+
Ok(true.into())
69+
} else {
70+
match pid_to_process(&pid) {
71+
Some(pid_arc_process) => {
72+
process.link(&pid_arc_process);
73+
74+
Ok(true.into())
75+
}
76+
None => Err(error!(atom_unchecked("noproc")).into()),
77+
}
78+
}
79+
}
80+
TypedTerm::Port(_) => unimplemented!(),
81+
TypedTerm::Boxed(boxed) => match boxed.to_typed_term().unwrap() {
82+
TypedTerm::ExternalPid(_) => unimplemented!(),
83+
TypedTerm::ExternalPort(_) => unimplemented!(),
84+
_ => Err(badarg!().into()),
85+
},
86+
_ => Err(badarg!().into()),
87+
}
88+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
mod with_local_pid;
2+
3+
use proptest::prop_assert_eq;
4+
use proptest::strategy::Strategy;
5+
use proptest::test_runner::{Config, TestRunner};
6+
7+
use liblumen_alloc::badarg;
8+
use liblumen_alloc::erts::process::ProcessControlBlock;
9+
10+
use crate::otp::erlang::link_1::native;
11+
use crate::scheduler::{with_process, with_process_arc};
12+
use crate::test::strategy;
13+
14+
#[test]
15+
fn without_pid_or_port_errors_badarg() {
16+
with_process_arc(|arc_process| {
17+
TestRunner::new(Config::with_source_file(file!()))
18+
.run(
19+
&strategy::term(arc_process.clone())
20+
.prop_filter("Cannot be pid or port", |pid_or_port| {
21+
!(pid_or_port.is_pid() || pid_or_port.is_port())
22+
}),
23+
|pid_or_port| {
24+
prop_assert_eq!(native(&arc_process, pid_or_port), Err(badarg!().into()));
25+
26+
Ok(())
27+
},
28+
)
29+
.unwrap();
30+
});
31+
}
32+
33+
fn link_count(process: &ProcessControlBlock) -> usize {
34+
process.linked_pid_set.lock().len()
35+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use super::*;
2+
3+
use liblumen_alloc::error;
4+
use liblumen_alloc::erts::process::code::stack::frame::Placement;
5+
use liblumen_alloc::erts::term::{atom_unchecked, next_pid};
6+
7+
use crate::otp::erlang;
8+
use crate::process;
9+
use crate::scheduler::Scheduler;
10+
11+
#[test]
12+
fn with_self_returns_true_but_does_not_create_link() {
13+
with_process(|process| {
14+
let link_count_before = link_count(process);
15+
16+
assert_eq!(native(process, process.pid_term()), Ok(true.into()));
17+
18+
assert_eq!(link_count(process), link_count_before);
19+
});
20+
}
21+
22+
#[test]
23+
fn with_non_existent_pid_errors_noproc() {
24+
with_process(|process| {
25+
let link_count_before = link_count(process);
26+
27+
assert_eq!(
28+
native(process, next_pid()),
29+
Err(error!(atom_unchecked("noproc")).into())
30+
);
31+
32+
assert_eq!(link_count(process), link_count_before);
33+
});
34+
}
35+
36+
#[test]
37+
fn with_existing_unlinked_pid_links_to_process() {
38+
with_process(|process| {
39+
let other_process = process::test(process);
40+
41+
let process_link_count_before = link_count(process);
42+
let other_process_link_count_before = link_count(process);
43+
44+
assert_eq!(native(process, other_process.pid_term()), Ok(true.into()));
45+
46+
assert_eq!(link_count(process), process_link_count_before + 1);
47+
assert_eq!(
48+
link_count(&other_process),
49+
other_process_link_count_before + 1
50+
);
51+
});
52+
}
53+
54+
#[test]
55+
fn with_existing_linked_pid_returns_true() {
56+
with_process(|process| {
57+
let other_process = process::test(process);
58+
59+
process.link(&other_process);
60+
61+
let process_link_count_before = link_count(process);
62+
let other_process_link_count_before = link_count(process);
63+
64+
assert_eq!(native(process, other_process.pid_term()), Ok(true.into()));
65+
66+
assert_eq!(link_count(process), process_link_count_before);
67+
assert_eq!(link_count(&other_process), other_process_link_count_before);
68+
});
69+
}
70+
71+
#[test]
72+
fn when_a_linked_process_exits_the_process_exits_too() {
73+
with_process(|process| {
74+
let other_arc_process = process::test(process);
75+
76+
assert_eq!(
77+
native(process, other_arc_process.pid_term()),
78+
Ok(true.into())
79+
);
80+
81+
assert!(Scheduler::current().run_through(&other_arc_process));
82+
83+
assert!(!other_arc_process.is_exiting());
84+
assert!(!process.is_exiting());
85+
86+
erlang::exit_1::place_frame_with_arguments(
87+
&other_arc_process,
88+
Placement::Replace,
89+
atom_unchecked("normal"),
90+
)
91+
.unwrap();
92+
93+
assert!(Scheduler::current().run_through(&other_arc_process));
94+
95+
assert!(other_arc_process.is_exiting());
96+
assert!(process.is_exiting())
97+
});
98+
}
99+
100+
#[test]
101+
fn when_the_process_exits_linked_processes_exit_too() {
102+
with_process_arc(|arc_process| {
103+
let other_arc_process = process::test(&arc_process);
104+
105+
assert_eq!(
106+
native(&arc_process, other_arc_process.pid_term()),
107+
Ok(true.into())
108+
);
109+
110+
assert!(Scheduler::current().run_through(&other_arc_process));
111+
112+
assert!(!other_arc_process.is_exiting());
113+
assert!(!arc_process.is_exiting());
114+
115+
erlang::exit_1::place_frame_with_arguments(
116+
&arc_process,
117+
Placement::Replace,
118+
atom_unchecked("normal"),
119+
)
120+
.unwrap();
121+
122+
assert!(Scheduler::current().run_through(&arc_process));
123+
124+
assert!(other_arc_process.is_exiting());
125+
assert!(arc_process.is_exiting())
126+
});
127+
}

lumen_runtime/src/process.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use hashbrown::HashMap;
77

88
use liblumen_core::locks::RwLockWriteGuard;
99

10+
use liblumen_alloc::erts::exception::runtime;
1011
use liblumen_alloc::erts::exception::system::Alloc;
1112
use liblumen_alloc::erts::process::code::stack::frame::{Frame, Placement};
1213
use liblumen_alloc::erts::process::code::Code;
@@ -19,6 +20,62 @@ use crate::code;
1920
use crate::otp::erlang::apply_3;
2021
use crate::registry::*;
2122
use crate::scheduler::Scheduler;
23+
use crate::system;
24+
#[cfg(test)]
25+
use crate::test;
26+
27+
fn is_expected_exit_reason(reason: Term) -> bool {
28+
match reason.to_typed_term().unwrap() {
29+
TypedTerm::Atom(atom) => match atom.name() {
30+
"normal" | "shutdown" => true,
31+
_ => false,
32+
},
33+
TypedTerm::Boxed(boxed) => match boxed.to_typed_term().unwrap() {
34+
TypedTerm::Tuple(tuple) => {
35+
tuple.len() == 2 && {
36+
match tuple[0].to_typed_term().unwrap() {
37+
TypedTerm::Atom(atom) => atom.name() == "shutdown",
38+
_ => false,
39+
}
40+
}
41+
}
42+
_ => false,
43+
},
44+
_ => false,
45+
}
46+
}
47+
48+
pub fn log_exit(process: &ProcessControlBlock, exception: &runtime::Exception) {
49+
match exception.class {
50+
runtime::Class::Exit => {
51+
let reason = exception.reason;
52+
53+
if !is_expected_exit_reason(reason) {
54+
system::io::puts(&format!(
55+
"** (EXIT from {}) exited with reason: {}",
56+
process, reason
57+
));
58+
}
59+
}
60+
runtime::Class::Error { .. } => system::io::puts(&format!(
61+
"** (EXIT from {}) exited with reason: an exception was raised: {}\n{}",
62+
process,
63+
exception.reason,
64+
process.stacktrace()
65+
)),
66+
_ => unimplemented!("{:?}", exception),
67+
}
68+
}
69+
70+
pub fn propagate_exit(process: &ProcessControlBlock) {
71+
for linked_pid in process.linked_pid_set.lock().iter() {
72+
if let Some(linked_pid_arc_process) = pid_to_process(linked_pid) {
73+
// only tell the linked process to exit. When it is run by its scheduler, it will
74+
// go through propagating its own exit.
75+
linked_pid_arc_process.exit()
76+
}
77+
}
78+
}
2279

2380
pub fn register_in(
2481
arc_process_control_block: Arc<ProcessControlBlock>,

0 commit comments

Comments
 (0)