Skip to content

Commit f735720

Browse files
committed
Add support for external pid as group leader
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 47bbff1 commit f735720

File tree

9 files changed

+174
-47
lines changed

9 files changed

+174
-47
lines changed

src/libAtomVM/context.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
217217
return true;
218218
}
219219

220+
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal)
221+
{
222+
size_t leader_term_size = memory_estimate_usage(signal->signal_term);
223+
ctx->group_leader = UNDEFINED_ATOM;
224+
if (UNLIKELY(memory_ensure_free_opt(ctx, leader_term_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
225+
return false;
226+
}
227+
ctx->group_leader = memory_copy_term_tree(&ctx->heap, signal->signal_term);
228+
return true;
229+
}
230+
220231
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info)
221232
{
222233
context_update_flags(ctx, ~Trap, NoFlags);

src/libAtomVM/context.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,15 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
393393
*/
394394
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info);
395395

396+
/**
397+
* @brief Process set group leader signal
398+
*
399+
* @param ctx the context being executed
400+
* @param signal the message with the group leader term
401+
* @return \c true if successful, \c false in case of memory error
402+
*/
403+
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal);
404+
396405
/**
397406
* @brief Get process information.
398407
*

src/libAtomVM/dist_nifs.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,10 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
496496
}
497497
term roots[4];
498498
roots[0] = argv[0];
499-
roots[1] = argv[1];
499+
roots[1] = argv[1]; // dist handle, ensure it's not garbage collected until we return
500500
roots[2] = control;
501501
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
502-
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
502+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(5)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
503503
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
504504
}
505505
control = roots[2];
@@ -516,15 +516,18 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
516516
if (UNLIKELY(!term_is_pid(from))) {
517517
RAISE_ERROR(BADARG_ATOM);
518518
}
519-
// term groupleader = term_get_tuple_element(control, 3);
520-
// TODO: handle groupleader which is an externalpid
519+
term groupleader = term_get_tuple_element(control, 3);
520+
if (UNLIKELY(!term_is_pid(groupleader))) {
521+
RAISE_ERROR(BADARG_ATOM);
522+
}
521523
term options = term_get_tuple_element(control, 5);
522524

523-
term request_tuple = term_alloc_tuple(4, &ctx->heap);
525+
term request_tuple = term_alloc_tuple(5, &ctx->heap);
524526
term_put_tuple_element(request_tuple, 0, roots[0]);
525527
term_put_tuple_element(request_tuple, 1, reqid);
526528
term_put_tuple_element(request_tuple, 2, from);
527-
term_put_tuple_element(request_tuple, 3, options);
529+
term_put_tuple_element(request_tuple, 3, groupleader);
530+
term_put_tuple_element(request_tuple, 4, options);
528531
term request_opt = term_alloc_tuple(2, &ctx->heap);
529532
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
530533
term_put_tuple_element(request_opt, 1, request_tuple);

src/libAtomVM/mailbox.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap)
9696
break;
9797
}
9898
case KillSignal:
99-
case TrapAnswerSignal: {
99+
case TrapAnswerSignal:
100+
case SetGroupLeaderSignal: {
100101
struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base);
101102
term mso_list = term_signal->storage[STORAGE_MSO_LIST_INDEX];
102103
HeapFragment *fragment = mailbox_message_to_heap_fragment(term_signal, term_signal->heap_end);

src/libAtomVM/mailbox.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ enum MessageType
8989
TrapExceptionSignal,
9090
FlushMonitorSignal,
9191
FlushInfoMonitorSignal,
92+
SetGroupLeaderSignal,
9293
};
9394

9495
struct MailboxMessage

src/libAtomVM/memory.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ static enum MemoryGCResult memory_gc(Context *ctx, size_t new_size, size_t num_r
300300
TRACE("- Running copy GC on exit reason\n");
301301
ctx->exit_reason = memory_shallow_copy_term(old_root_fragment, ctx->exit_reason, &ctx->heap.heap_ptr, true);
302302

303+
TRACE("- Running copy GC on group leader\n");
304+
ctx->group_leader = memory_shallow_copy_term(old_root_fragment, ctx->group_leader, &ctx->heap.heap_ptr, true);
305+
303306
TRACE("- Running copy GC on provided roots\n");
304307
for (size_t i = 0; i < num_roots; i++) {
305308
roots[i] = memory_shallow_copy_term(old_root_fragment, roots[i], &ctx->heap.heap_ptr, 1);
@@ -373,6 +376,8 @@ static enum MemoryGCResult memory_shrink(Context *ctx, size_t new_size, size_t n
373376
}
374377
// ...exit_reason
375378
memory_scan_and_rewrite(1, &ctx->exit_reason, old_heap_root, old_end, delta, true);
379+
// ...group_leader
380+
memory_scan_and_rewrite(1, &ctx->group_leader, old_heap_root, old_end, delta, true);
376381
// ...and MSO list.
377382
term *mso_ptr = &ctx->heap.root->mso_list;
378383
while (!term_is_nil(*mso_ptr)) {

src/libAtomVM/nifs.c

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,14 +1211,21 @@ static NativeHandlerResult process_console_mailbox(Context *ctx)
12111211

12121212
// Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4
12131213
// opts_term is [] for spawn/1,3
1214-
static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
1214+
static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term)
12151215
{
12161216
term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM);
12171217
term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM);
12181218
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
12191219
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
12201220
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
12211221
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
1222+
term group_leader;
1223+
1224+
if (UNLIKELY(request_term != term_nil())) {
1225+
group_leader = term_get_tuple_element(request_term, 3);
1226+
} else {
1227+
group_leader = ctx->group_leader;
1228+
}
12221229

12231230
if (min_heap_size_term != term_nil()) {
12241231
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
@@ -1245,6 +1252,21 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
12451252
}
12461253
}
12471254

1255+
int size = 0;
1256+
for (uint32_t i = 0; i < n_freeze; i++) {
1257+
size += memory_estimate_usage(new_ctx->x[i + arity - n_freeze]);
1258+
}
1259+
size += memory_estimate_usage(group_leader);
1260+
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1261+
//TODO: new process should be terminated, however a new pid is returned anyway
1262+
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
1263+
AVM_ABORT();
1264+
}
1265+
new_ctx->group_leader = memory_copy_term_tree(&new_ctx->heap, group_leader);
1266+
for (uint32_t i = 0; i < arity; i++) {
1267+
new_ctx->x[i] = memory_copy_term_tree(&new_ctx->heap, new_ctx->x[i]);
1268+
}
1269+
12481270
switch (heap_growth_strategy) {
12491271
case BOUNDED_FREE_ATOM:
12501272
new_ctx->heap_growth_strategy = BoundedFreeHeapGrowth;
@@ -1308,7 +1330,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
13081330
term dhandle = term_get_tuple_element(request_term, 0);
13091331
term request_ref = term_get_tuple_element(request_term, 1);
13101332
term request_from = term_get_tuple_element(request_term, 2);
1311-
term request_opts = term_get_tuple_element(request_term, 3);
1333+
term request_opts = term_get_tuple_element(request_term, 4);
13121334
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
13131335
// TODO handle link with external nodes
13141336
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
@@ -1344,7 +1366,6 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13441366
VALIDATE_VALUE(opts_term, term_is_list);
13451367

13461368
Context *new_ctx = context_new(ctx->global);
1347-
new_ctx->group_leader = ctx->group_leader;
13481369

13491370
const term *boxed_value = term_to_const_term_ptr(fun_term);
13501371

@@ -1365,24 +1386,15 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13651386

13661387
// TODO: new process should fail with badarity if arity != 0
13671388

1368-
int size = 0;
13691389
for (uint32_t i = 0; i < n_freeze; i++) {
1370-
size += memory_estimate_usage(boxed_value[i + 3]);
1371-
}
1372-
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1373-
//TODO: new process should be terminated, however a new pid is returned anyway
1374-
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
1375-
AVM_ABORT();
1376-
}
1377-
for (uint32_t i = 0; i < n_freeze; i++) {
1378-
new_ctx->x[i + arity - n_freeze] = memory_copy_term_tree(&new_ctx->heap, boxed_value[i + 3]);
1390+
new_ctx->x[i + arity - n_freeze] = boxed_value[i + 3];
13791391
}
13801392

13811393
new_ctx->saved_module = fun_module;
13821394
new_ctx->saved_ip = fun_module->labels[label];
13831395
new_ctx->cp = module_address(fun_module->module_index, fun_module->end_instruction_ii);
13841396

1385-
return do_spawn(ctx, new_ctx, opts_term);
1397+
return do_spawn(ctx, new_ctx, arity, n_freeze, opts_term);
13861398
}
13871399

13881400
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
@@ -1399,7 +1411,6 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
13991411
VALIDATE_VALUE(opts_term, term_is_list);
14001412

14011413
Context *new_ctx = context_new(ctx->global);
1402-
new_ctx->group_leader = ctx->group_leader;
14031414

14041415
AtomString module_string = globalcontext_atomstring_from_term(ctx->global, argv[0]);
14051416
AtomString function_string = globalcontext_atomstring_from_term(ctx->global, argv[1]);
@@ -1439,14 +1450,8 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
14391450
new_ctx->min_heap_size = min_heap_size;
14401451
}
14411452

1442-
avm_int_t size = memory_estimate_usage(args_term);
1443-
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1444-
// Context was not scheduled yet, we can destroy it.
1445-
context_destroy(new_ctx);
1446-
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
1447-
}
14481453
while (term_is_nonempty_list(args_term)) {
1449-
new_ctx->x[reg_index] = memory_copy_term_tree(&new_ctx->heap, term_get_list_head(args_term));
1454+
new_ctx->x[reg_index] = term_get_list_head(args_term);
14501455
reg_index++;
14511456

14521457
args_term = term_get_list_tail(args_term);
@@ -1456,7 +1461,7 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
14561461
}
14571462
}
14581463

1459-
return do_spawn(ctx, new_ctx, opts_term);
1464+
return do_spawn(ctx, new_ctx, reg_index, reg_index, opts_term);
14601465
}
14611466

14621467
static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
@@ -4003,15 +4008,21 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
40034008
term leader = argv[0];
40044009
term pid = argv[1];
40054010
VALIDATE_VALUE(pid, term_is_local_pid);
4006-
VALIDATE_VALUE(leader, term_is_local_pid);
4011+
VALIDATE_VALUE(leader, term_is_pid);
40074012

40084013
int local_process_id = term_to_local_process_id(pid);
40094014
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
40104015
if (IS_NULL_PTR(target)) {
40114016
RAISE_ERROR(BADARG_ATOM);
40124017
}
40134018

4014-
target->group_leader = leader;
4019+
if (term_is_local_pid(leader)) {
4020+
// We cannot put leader term on the heap
4021+
mailbox_send_term_signal(target, SetGroupLeaderSignal, leader);
4022+
} else {
4023+
target->group_leader = leader;
4024+
}
4025+
40154026
globalcontext_get_process_unlock(ctx->global, target);
40164027
return TRUE_ATOM;
40174028
}

src/libAtomVM/opcodesswitch.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,15 @@ static void destroy_extended_registers(Context *ctx, unsigned int live)
10491049
context_process_flush_monitor_signal(ctx, flush_signal->ref_ticks, info); \
10501050
break; \
10511051
} \
1052+
case SetGroupLeaderSignal: { \
1053+
struct TermSignal *group_leader \
1054+
= CONTAINER_OF(signal_message, struct TermSignal, base); \
1055+
if (UNLIKELY(!context_process_signal_set_group_leader(ctx, group_leader))) { \
1056+
SET_ERROR(OUT_OF_MEMORY_ATOM); \
1057+
next_label = &&handle_error; \
1058+
} \
1059+
break; \
1060+
} \
10521061
case NormalMessage: { \
10531062
UNREACHABLE(); \
10541063
} \

tests/libs/estdlib/test_net_kernel.erl

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ test() ->
3737
ok = test_rpc_loop_from_beam(Platform),
3838
ok = test_autoconnect_fail(Platform),
3939
ok = test_autoconnect_to_beam(Platform),
40+
ok = test_groupleader(Platform),
4041
ok;
4142
false ->
4243
io:format("~s: skipped\n", [?MODULE]),
@@ -156,21 +157,24 @@ test_autoconnect_to_beam(Platform) ->
156157
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
157158
Node = node(),
158159
erlang:set_cookie(Node, 'AtomVM'),
159-
spawn_link(fun() ->
160-
execute_command(
161-
Platform,
162-
"erl -sname otp -setcookie AtomVM -eval \""
163-
"register(beam, self()),"
164-
"F = fun(G) ->"
165-
" receive"
166-
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
167-
" {Caller, quit} -> Caller ! {self(), quit}"
168-
" after 5000 -> timeout"
169-
" end "
170-
"end, "
171-
"F(F).\" -s init stop -noshell"
172-
)
173-
end),
160+
{Pid, MonitorRef} = spawn_opt(
161+
fun() ->
162+
[] = execute_command(
163+
Platform,
164+
"erl -sname otp -setcookie AtomVM -eval \""
165+
"register(beam, self()),"
166+
"F = fun(G) ->"
167+
" receive"
168+
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
169+
" {Caller, quit} -> Caller ! {self(), quit}"
170+
" after 5000 -> exit(timeout)"
171+
" end "
172+
"end, "
173+
"F(F).\" -s init stop -noshell"
174+
)
175+
end,
176+
[link, monitor]
177+
),
174178
% Wait sufficiently for beam to be up, without connecting to it since
175179
% that's part of the test
176180
timer:sleep(1000),
@@ -200,6 +204,79 @@ test_autoconnect_to_beam(Platform) ->
200204
{OTPPid, quit} -> ok
201205
after 5000 -> timeout
202206
end,
207+
normal =
208+
receive
209+
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
210+
after 5000 -> timeout
211+
end,
212+
net_kernel:stop(),
213+
ok.
214+
215+
test_groupleader(Platform) ->
216+
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
217+
Node = node(),
218+
erlang:set_cookie(Node, 'AtomVM'),
219+
register(atomvm, self()),
220+
Parent = self(),
221+
{Pid, MonitorRef} = spawn_opt(
222+
fun() ->
223+
Result = execute_command(
224+
Platform,
225+
"erl -sname otp -setcookie AtomVM -eval \""
226+
"{atomvm, '" ++ atom_to_list(Node) ++
227+
"'} ! {beam, self()}, "
228+
"F = fun(G) ->"
229+
" receive"
230+
" {Caller, apply, M, F, A} -> Result = apply(M, F, A), Caller ! {self(), Result}, G(G);"
231+
" {Caller, quit} -> Caller ! {self(), quit}"
232+
" after 5000 -> exit(timeout)"
233+
" end "
234+
"end, "
235+
"F(F).\" -s init stop -noshell"
236+
),
237+
Parent ! {io_result, Result}
238+
end,
239+
[link, monitor]
240+
),
241+
BeamMainPid =
242+
receive
243+
{beam, BeamMainPid0} ->
244+
BeamMainPid0;
245+
{io_result, Result0} ->
246+
io:format("~s\n", [Result0]),
247+
exit(timeout)
248+
after 5000 -> exit(timeout)
249+
end,
250+
BeamMainPid ! {self(), apply, rpc, call, [Node, io, format, ["hello group leader"]]},
251+
ok =
252+
receive
253+
{BeamMainPid, Result} ->
254+
Result;
255+
{io_result, Result1} ->
256+
io:format("~s\n", [Result1]),
257+
exit(timeout)
258+
after 5000 -> exit(timeout)
259+
end,
260+
BeamMainPid ! {self(), quit},
261+
ok =
262+
receive
263+
{BeamMainPid, quit} ->
264+
ok;
265+
{io_result, Result2} ->
266+
io:format("~s\n", [Result2]),
267+
exit(timeout)
268+
after 5000 -> timeout
269+
end,
270+
"hello group leader" =
271+
receive
272+
{io_result, IOResult} -> IOResult
273+
after 5000 -> timeout
274+
end,
275+
normal =
276+
receive
277+
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
278+
after 5000 -> timeout
279+
end,
203280
net_kernel:stop(),
204281
ok.
205282

0 commit comments

Comments
 (0)