Skip to content

Commit 279c44a

Browse files
committed
Merge pull request #1480 from pguyot/w04/erlang-distribution-04
Distribution: Add support for external pid as group leader Continuation of #1468 - Store group leader pid on the heap if it's an external pid - Introduce a specific signal to change group leader to an external pid as we cannot touch heap from another process - Add simple test based on rpc and BEAM These changes are made under both the "Apache 2.0" and the "GNU Lesser General Public License 2.1 or later" license terms (dual license). SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
2 parents ec336e4 + f735720 commit 279c44a

File tree

9 files changed

+174
-47
lines changed

9 files changed

+174
-47
lines changed

src/libAtomVM/context.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
217217
return true;
218218
}
219219

220+
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal)
221+
{
222+
size_t leader_term_size = memory_estimate_usage(signal->signal_term);
223+
ctx->group_leader = UNDEFINED_ATOM;
224+
if (UNLIKELY(memory_ensure_free_opt(ctx, leader_term_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
225+
return false;
226+
}
227+
ctx->group_leader = memory_copy_term_tree(&ctx->heap, signal->signal_term);
228+
return true;
229+
}
230+
220231
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info)
221232
{
222233
context_update_flags(ctx, ~Trap, NoFlags);

src/libAtomVM/context.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,15 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
393393
*/
394394
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info);
395395

396+
/**
397+
* @brief Process set group leader signal
398+
*
399+
* @param ctx the context being executed
400+
* @param signal the message with the group leader term
401+
* @return \c true if successful, \c false in case of memory error
402+
*/
403+
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal);
404+
396405
/**
397406
* @brief Get process information.
398407
*

src/libAtomVM/dist_nifs.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,10 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
496496
}
497497
term roots[4];
498498
roots[0] = argv[0];
499-
roots[1] = argv[1];
499+
roots[1] = argv[1]; // dist handle, ensure it's not garbage collected until we return
500500
roots[2] = control;
501501
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
502-
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
502+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(5)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
503503
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
504504
}
505505
control = roots[2];
@@ -516,15 +516,18 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
516516
if (UNLIKELY(!term_is_pid(from))) {
517517
RAISE_ERROR(BADARG_ATOM);
518518
}
519-
// term groupleader = term_get_tuple_element(control, 3);
520-
// TODO: handle groupleader which is an externalpid
519+
term groupleader = term_get_tuple_element(control, 3);
520+
if (UNLIKELY(!term_is_pid(groupleader))) {
521+
RAISE_ERROR(BADARG_ATOM);
522+
}
521523
term options = term_get_tuple_element(control, 5);
522524

523-
term request_tuple = term_alloc_tuple(4, &ctx->heap);
525+
term request_tuple = term_alloc_tuple(5, &ctx->heap);
524526
term_put_tuple_element(request_tuple, 0, roots[0]);
525527
term_put_tuple_element(request_tuple, 1, reqid);
526528
term_put_tuple_element(request_tuple, 2, from);
527-
term_put_tuple_element(request_tuple, 3, options);
529+
term_put_tuple_element(request_tuple, 3, groupleader);
530+
term_put_tuple_element(request_tuple, 4, options);
528531
term request_opt = term_alloc_tuple(2, &ctx->heap);
529532
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
530533
term_put_tuple_element(request_opt, 1, request_tuple);

src/libAtomVM/mailbox.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap)
9696
break;
9797
}
9898
case KillSignal:
99-
case TrapAnswerSignal: {
99+
case TrapAnswerSignal:
100+
case SetGroupLeaderSignal: {
100101
struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base);
101102
term mso_list = term_signal->storage[STORAGE_MSO_LIST_INDEX];
102103
HeapFragment *fragment = mailbox_message_to_heap_fragment(term_signal, term_signal->heap_end);

src/libAtomVM/mailbox.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ enum MessageType
8989
TrapExceptionSignal,
9090
FlushMonitorSignal,
9191
FlushInfoMonitorSignal,
92+
SetGroupLeaderSignal,
9293
};
9394

9495
struct MailboxMessage

src/libAtomVM/memory.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ static enum MemoryGCResult memory_gc(Context *ctx, size_t new_size, size_t num_r
300300
TRACE("- Running copy GC on exit reason\n");
301301
ctx->exit_reason = memory_shallow_copy_term(old_root_fragment, ctx->exit_reason, &ctx->heap.heap_ptr, true);
302302

303+
TRACE("- Running copy GC on group leader\n");
304+
ctx->group_leader = memory_shallow_copy_term(old_root_fragment, ctx->group_leader, &ctx->heap.heap_ptr, true);
305+
303306
TRACE("- Running copy GC on provided roots\n");
304307
for (size_t i = 0; i < num_roots; i++) {
305308
roots[i] = memory_shallow_copy_term(old_root_fragment, roots[i], &ctx->heap.heap_ptr, 1);
@@ -373,6 +376,8 @@ static enum MemoryGCResult memory_shrink(Context *ctx, size_t new_size, size_t n
373376
}
374377
// ...exit_reason
375378
memory_scan_and_rewrite(1, &ctx->exit_reason, old_heap_root, old_end, delta, true);
379+
// ...group_leader
380+
memory_scan_and_rewrite(1, &ctx->group_leader, old_heap_root, old_end, delta, true);
376381
// ...and MSO list.
377382
term *mso_ptr = &ctx->heap.root->mso_list;
378383
while (!term_is_nil(*mso_ptr)) {

src/libAtomVM/nifs.c

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,14 +1211,21 @@ static NativeHandlerResult process_console_mailbox(Context *ctx)
12111211

12121212
// Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4
12131213
// opts_term is [] for spawn/1,3
1214-
static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
1214+
static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term)
12151215
{
12161216
term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM);
12171217
term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM);
12181218
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
12191219
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
12201220
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
12211221
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
1222+
term group_leader;
1223+
1224+
if (UNLIKELY(request_term != term_nil())) {
1225+
group_leader = term_get_tuple_element(request_term, 3);
1226+
} else {
1227+
group_leader = ctx->group_leader;
1228+
}
12221229

12231230
if (min_heap_size_term != term_nil()) {
12241231
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
@@ -1245,6 +1252,21 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
12451252
}
12461253
}
12471254

1255+
int size = 0;
1256+
for (uint32_t i = 0; i < n_freeze; i++) {
1257+
size += memory_estimate_usage(new_ctx->x[i + arity - n_freeze]);
1258+
}
1259+
size += memory_estimate_usage(group_leader);
1260+
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1261+
//TODO: new process should be terminated, however a new pid is returned anyway
1262+
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
1263+
AVM_ABORT();
1264+
}
1265+
new_ctx->group_leader = memory_copy_term_tree(&new_ctx->heap, group_leader);
1266+
for (uint32_t i = 0; i < arity; i++) {
1267+
new_ctx->x[i] = memory_copy_term_tree(&new_ctx->heap, new_ctx->x[i]);
1268+
}
1269+
12481270
switch (heap_growth_strategy) {
12491271
case BOUNDED_FREE_ATOM:
12501272
new_ctx->heap_growth_strategy = BoundedFreeHeapGrowth;
@@ -1308,7 +1330,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
13081330
term dhandle = term_get_tuple_element(request_term, 0);
13091331
term request_ref = term_get_tuple_element(request_term, 1);
13101332
term request_from = term_get_tuple_element(request_term, 2);
1311-
term request_opts = term_get_tuple_element(request_term, 3);
1333+
term request_opts = term_get_tuple_element(request_term, 4);
13121334
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
13131335
// TODO handle link with external nodes
13141336
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
@@ -1344,7 +1366,6 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13441366
VALIDATE_VALUE(opts_term, term_is_list);
13451367

13461368
Context *new_ctx = context_new(ctx->global);
1347-
new_ctx->group_leader = ctx->group_leader;
13481369

13491370
const term *boxed_value = term_to_const_term_ptr(fun_term);
13501371

@@ -1365,24 +1386,15 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13651386

13661387
// TODO: new process should fail with badarity if arity != 0
13671388

1368-
int size = 0;
13691389
for (uint32_t i = 0; i < n_freeze; i++) {
1370-
size += memory_estimate_usage(boxed_value[i + 3]);
1371-
}
1372-
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1373-
//TODO: new process should be terminated, however a new pid is returned anyway
1374-
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
1375-
AVM_ABORT();
1376-
}
1377-
for (uint32_t i = 0; i < n_freeze; i++) {
1378-
new_ctx->x[i + arity - n_freeze] = memory_copy_term_tree(&new_ctx->heap, boxed_value[i + 3]);
1390+
new_ctx->x[i + arity - n_freeze] = boxed_value[i + 3];
13791391
}
13801392

13811393
new_ctx->saved_module = fun_module;
13821394
new_ctx->saved_ip = fun_module->labels[label];
13831395
new_ctx->cp = module_address(fun_module->module_index, fun_module->end_instruction_ii);
13841396

1385-
return do_spawn(ctx, new_ctx, opts_term);
1397+
return do_spawn(ctx, new_ctx, arity, n_freeze, opts_term);
13861398
}
13871399

13881400
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
@@ -1399,7 +1411,6 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
13991411
VALIDATE_VALUE(opts_term, term_is_list);
14001412

14011413
Context *new_ctx = context_new(ctx->global);
1402-
new_ctx->group_leader = ctx->group_leader;
14031414

14041415
AtomString module_string = globalcontext_atomstring_from_term(ctx->global, argv[0]);
14051416
AtomString function_string = globalcontext_atomstring_from_term(ctx->global, argv[1]);
@@ -1439,14 +1450,8 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
14391450
new_ctx->min_heap_size = min_heap_size;
14401451
}
14411452

1442-
avm_int_t size = memory_estimate_usage(args_term);
1443-
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
1444-
// Context was not scheduled yet, we can destroy it.
1445-
context_destroy(new_ctx);
1446-
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
1447-
}
14481453
while (term_is_nonempty_list(args_term)) {
1449-
new_ctx->x[reg_index] = memory_copy_term_tree(&new_ctx->heap, term_get_list_head(args_term));
1454+
new_ctx->x[reg_index] = term_get_list_head(args_term);
14501455
reg_index++;
14511456

14521457
args_term = term_get_list_tail(args_term);
@@ -1456,7 +1461,7 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
14561461
}
14571462
}
14581463

1459-
return do_spawn(ctx, new_ctx, opts_term);
1464+
return do_spawn(ctx, new_ctx, reg_index, reg_index, opts_term);
14601465
}
14611466

14621467
static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
@@ -4003,15 +4008,21 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
40034008
term leader = argv[0];
40044009
term pid = argv[1];
40054010
VALIDATE_VALUE(pid, term_is_local_pid);
4006-
VALIDATE_VALUE(leader, term_is_local_pid);
4011+
VALIDATE_VALUE(leader, term_is_pid);
40074012

40084013
int local_process_id = term_to_local_process_id(pid);
40094014
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
40104015
if (IS_NULL_PTR(target)) {
40114016
RAISE_ERROR(BADARG_ATOM);
40124017
}
40134018

4014-
target->group_leader = leader;
4019+
if (term_is_local_pid(leader)) {
4020+
// We cannot put leader term on the heap
4021+
mailbox_send_term_signal(target, SetGroupLeaderSignal, leader);
4022+
} else {
4023+
target->group_leader = leader;
4024+
}
4025+
40154026
globalcontext_get_process_unlock(ctx->global, target);
40164027
return TRUE_ATOM;
40174028
}

src/libAtomVM/opcodesswitch.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,15 @@ static void destroy_extended_registers(Context *ctx, unsigned int live)
10491049
context_process_flush_monitor_signal(ctx, flush_signal->ref_ticks, info); \
10501050
break; \
10511051
} \
1052+
case SetGroupLeaderSignal: { \
1053+
struct TermSignal *group_leader \
1054+
= CONTAINER_OF(signal_message, struct TermSignal, base); \
1055+
if (UNLIKELY(!context_process_signal_set_group_leader(ctx, group_leader))) { \
1056+
SET_ERROR(OUT_OF_MEMORY_ATOM); \
1057+
next_label = &&handle_error; \
1058+
} \
1059+
break; \
1060+
} \
10521061
case NormalMessage: { \
10531062
UNREACHABLE(); \
10541063
} \

tests/libs/estdlib/test_net_kernel.erl

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ test() ->
3737
ok = test_rpc_loop_from_beam(Platform),
3838
ok = test_autoconnect_fail(Platform),
3939
ok = test_autoconnect_to_beam(Platform),
40+
ok = test_groupleader(Platform),
4041
ok;
4142
false ->
4243
io:format("~s: skipped\n", [?MODULE]),
@@ -156,21 +157,24 @@ test_autoconnect_to_beam(Platform) ->
156157
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
157158
Node = node(),
158159
erlang:set_cookie(Node, 'AtomVM'),
159-
spawn_link(fun() ->
160-
execute_command(
161-
Platform,
162-
"erl -sname otp -setcookie AtomVM -eval \""
163-
"register(beam, self()),"
164-
"F = fun(G) ->"
165-
" receive"
166-
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
167-
" {Caller, quit} -> Caller ! {self(), quit}"
168-
" after 5000 -> timeout"
169-
" end "
170-
"end, "
171-
"F(F).\" -s init stop -noshell"
172-
)
173-
end),
160+
{Pid, MonitorRef} = spawn_opt(
161+
fun() ->
162+
[] = execute_command(
163+
Platform,
164+
"erl -sname otp -setcookie AtomVM -eval \""
165+
"register(beam, self()),"
166+
"F = fun(G) ->"
167+
" receive"
168+
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
169+
" {Caller, quit} -> Caller ! {self(), quit}"
170+
" after 5000 -> exit(timeout)"
171+
" end "
172+
"end, "
173+
"F(F).\" -s init stop -noshell"
174+
)
175+
end,
176+
[link, monitor]
177+
),
174178
% Wait sufficiently for beam to be up, without connecting to it since
175179
% that's part of the test
176180
timer:sleep(1000),
@@ -200,6 +204,79 @@ test_autoconnect_to_beam(Platform) ->
200204
{OTPPid, quit} -> ok
201205
after 5000 -> timeout
202206
end,
207+
normal =
208+
receive
209+
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
210+
after 5000 -> timeout
211+
end,
212+
net_kernel:stop(),
213+
ok.
214+
215+
test_groupleader(Platform) ->
216+
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
217+
Node = node(),
218+
erlang:set_cookie(Node, 'AtomVM'),
219+
register(atomvm, self()),
220+
Parent = self(),
221+
{Pid, MonitorRef} = spawn_opt(
222+
fun() ->
223+
Result = execute_command(
224+
Platform,
225+
"erl -sname otp -setcookie AtomVM -eval \""
226+
"{atomvm, '" ++ atom_to_list(Node) ++
227+
"'} ! {beam, self()}, "
228+
"F = fun(G) ->"
229+
" receive"
230+
" {Caller, apply, M, F, A} -> Result = apply(M, F, A), Caller ! {self(), Result}, G(G);"
231+
" {Caller, quit} -> Caller ! {self(), quit}"
232+
" after 5000 -> exit(timeout)"
233+
" end "
234+
"end, "
235+
"F(F).\" -s init stop -noshell"
236+
),
237+
Parent ! {io_result, Result}
238+
end,
239+
[link, monitor]
240+
),
241+
BeamMainPid =
242+
receive
243+
{beam, BeamMainPid0} ->
244+
BeamMainPid0;
245+
{io_result, Result0} ->
246+
io:format("~s\n", [Result0]),
247+
exit(timeout)
248+
after 5000 -> exit(timeout)
249+
end,
250+
BeamMainPid ! {self(), apply, rpc, call, [Node, io, format, ["hello group leader"]]},
251+
ok =
252+
receive
253+
{BeamMainPid, Result} ->
254+
Result;
255+
{io_result, Result1} ->
256+
io:format("~s\n", [Result1]),
257+
exit(timeout)
258+
after 5000 -> exit(timeout)
259+
end,
260+
BeamMainPid ! {self(), quit},
261+
ok =
262+
receive
263+
{BeamMainPid, quit} ->
264+
ok;
265+
{io_result, Result2} ->
266+
io:format("~s\n", [Result2]),
267+
exit(timeout)
268+
after 5000 -> timeout
269+
end,
270+
"hello group leader" =
271+
receive
272+
{io_result, IOResult} -> IOResult
273+
after 5000 -> timeout
274+
end,
275+
normal =
276+
receive
277+
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
278+
after 5000 -> timeout
279+
end,
203280
net_kernel:stop(),
204281
ok.
205282

0 commit comments

Comments
 (0)