Skip to content

Commit 41d90c8

Browse files
committed
Merge pull request #1467 from pguyot/w02/erlang-distribution-02
Distribution: add support for rpc call from other nodes These changes are made under both the "Apache 2.0" and the "GNU Lesser General Public License 2.1 or later" license terms (dual license). SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
2 parents 6da1e93 + 09fb68b commit 41d90c8

File tree

11 files changed

+315
-38
lines changed

11 files changed

+315
-38
lines changed

examples/erlang/esp32/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@ pack_runnable(sx127x sx127x eavmlib estdlib)
3737
pack_runnable(reformat_nvs reformat_nvs eavmlib)
3838
pack_runnable(uartecho uartecho eavmlib estdlib)
3939
pack_runnable(ledc_example ledc_example eavmlib estdlib)
40+
pack_runnable(epmd_disterl epmd_disterl eavmlib estdlib)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
-module(epmd_disterl).
22+
23+
-export([start/0]).
24+
25+
start() ->
26+
Creds = [
27+
{ssid, "myssid"},
28+
{psk, "mypsk"}
29+
],
30+
case network:wait_for_sta(Creds, 30000) of
31+
{ok, {Address, _Netmask, _Gateway}} ->
32+
distribution_start(Address);
33+
Error ->
34+
io:format("An error occurred starting network: ~p~n", [Error])
35+
end.
36+
37+
distribution_start(Address) ->
38+
{ok, _EPMDPid} = epmd:start_link([]),
39+
{ok, _KernelPid} = kernel:start(normal, []),
40+
{X, Y, Z, T} = Address,
41+
Node = list_to_atom(lists:flatten(io_lib:format("atomvm@~B.~B.~B.~B", [X, Y, Z, T]))),
42+
{ok, _NetKernelPid} = net_kernel:start(Node, #{name_domain => longnames}),
43+
io:format("Distribution was started\n"),
44+
io:format("Node is ~p\n", [node()]),
45+
net_kernel:set_cookie(<<"AtomVM">>),
46+
io:format("Cookie is ~s\n", [net_kernel:get_cookie()]),
47+
register(disterl, self()),
48+
io:format(
49+
"This AtomVM node is waiting for 'quit' message, and this process is registered as 'disterl'\n"
50+
),
51+
io:format("On an OTP node with long names distribution, run:\n"),
52+
io:format("erlang:set_cookie('~s', 'AtomVM').\n", [Node]),
53+
io:format("{disterl, '~s'} ! quit.\n", [Node]),
54+
receive
55+
quit -> ok
56+
end.

libs/estdlib/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(ERLANG_MODULES
3131
crypto
3232
dist_util
3333
erl_epmd
34+
erpc
3435
erts_debug
3536
ets
3637
gen_event

libs/estdlib/src/erpc.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
%%-----------------------------------------------------------------------------
22+
%% @doc An implementation of the Erlang/OTP erpc interface.
23+
%%
24+
%% This module implements a strict subset of the Erlang/OTP erpc
25+
%% interface.
26+
%% @end
27+
%%-----------------------------------------------------------------------------
28+
-module(erpc).
29+
30+
% api
31+
-export([
32+
execute_call/4
33+
]).
34+
35+
%%-----------------------------------------------------------------------------
36+
%% @param Reference reference of the request, passed in exit tuple
37+
%% @param Module module to call
38+
%% @param Func function to call
39+
%% @param Args argument of the call
40+
%% @doc Execute a call locally, exiting with the result.
41+
%% This function is called from rpc on other nodes using spawn_request BIF.
42+
%% @end
43+
%%-----------------------------------------------------------------------------
44+
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
45+
no_return().
46+
execute_call(Reference, Module, Func, Args) ->
47+
Reply =
48+
try
49+
Result = apply(Module, Func, Args),
50+
{Reference, return, Result}
51+
catch
52+
throw:Reason ->
53+
{Reference, throw, Reason};
54+
exit:Reason ->
55+
{Reference, exit, Reason};
56+
error:Reason:Stack ->
57+
{Reference, error, Reason, Stack}
58+
end,
59+
exit(Reply).

src/libAtomVM/defaultatoms.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,4 @@ X(INET_ATOM, "\x4", "inet")
178178
X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181+
X(REQUEST_ATOM, "\x7", "request")

src/libAtomVM/dist_nifs.c

Lines changed: 107 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ enum
7070
OPERATION_ALIAS_SEND_TT = 34,
7171
};
7272

73+
enum
74+
{
75+
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
76+
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
77+
};
78+
7379
struct DistributionPacket
7480
{
7581
struct ListHead head;
@@ -129,7 +135,7 @@ static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj)
129135

130136
static void dist_enqueue_message(term control_message, term payload, struct DistConnection *connection, GlobalContext *global)
131137
{
132-
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 is not smart enough
138+
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 are not smart enough
133139
enum ExternalTermResult serialize_result = externalterm_compute_external_size(control_message, &control_message_size, global);
134140
if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) {
135141
size_t payload_size = 0;
@@ -195,10 +201,7 @@ static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pi
195201

196202
struct DistConnection *conn_obj = (struct DistConnection *) obj;
197203

198-
if (UNLIKELY(enif_compare_monitors(&conn_obj->connection_process_monitor, mon) == 0)) {
199-
struct RefcBinary *rsrc_refc = refc_binary_from_data(obj);
200-
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
201-
} else {
204+
if (enif_compare_monitors(&conn_obj->connection_process_monitor, mon) != 0) {
202205
struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors);
203206
struct ListHead *item;
204207
LIST_FOR_EACH (item, remote_monitors) {
@@ -280,10 +283,6 @@ static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[])
280283
list_prepend(dist_connections, &conn_obj->head);
281284
synclist_unlock(&ctx->global->dist_connections);
282285

283-
// Increment reference count as the resource should be alive until controller process dies
284-
struct RefcBinary *rsrc_refc = refc_binary_from_data(conn_obj);
285-
refc_binary_increment_refcount(rsrc_refc);
286-
287286
if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
288287
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
289288
}
@@ -339,6 +338,38 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
339338
return result;
340339
}
341340

341+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
342+
{
343+
if (term_is_atom(target_proc)) {
344+
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
345+
}
346+
int target_process_id = 0;
347+
if (term_is_local_pid(target_proc)) {
348+
target_process_id = term_to_local_process_id(target_proc);
349+
} else {
350+
RAISE_ERROR(BADARG_ATOM);
351+
}
352+
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
353+
monitor->target_proc = target_proc;
354+
monitor->pid_number = term_get_external_pid_process_id(from_pid);
355+
monitor->pid_serial = term_get_external_pid_serial(from_pid);
356+
monitor->ref_len = term_get_external_reference_len(monitor_ref);
357+
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
358+
if (target_process_id) {
359+
synclist_append(&conn_obj->remote_monitors, &monitor->head);
360+
ErlNifPid target_process_pid = target_process_id;
361+
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
362+
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
363+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
364+
free(monitor);
365+
}
366+
} else {
367+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
368+
free(monitor);
369+
}
370+
return OK_ATOM;
371+
}
372+
342373
static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
343374
{
344375
UNUSED(argc);
@@ -390,32 +421,8 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
390421
term from_pid = term_get_tuple_element(control, 1);
391422
term target_proc = term_get_tuple_element(control, 2);
392423
term monitor_ref = term_get_tuple_element(control, 3);
393-
if (term_is_atom(target_proc)) {
394-
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
395-
}
396-
int target_process_id = 0;
397-
if (term_is_local_pid(target_proc)) {
398-
target_process_id = term_to_local_process_id(target_proc);
399-
} else {
400-
RAISE_ERROR(BADARG_ATOM);
401-
}
402-
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
403-
monitor->target_proc = target_proc;
404-
monitor->pid_number = term_get_external_pid_process_id(from_pid);
405-
monitor->pid_serial = term_get_external_pid_serial(from_pid);
406-
monitor->ref_len = term_get_external_reference_len(monitor_ref);
407-
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
408-
if (target_process_id) {
409-
synclist_append(&conn_obj->remote_monitors, &monitor->head);
410-
ErlNifPid target_process_pid = target_process_id;
411-
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
412-
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
413-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
414-
free(monitor);
415-
}
416-
} else {
417-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
418-
free(monitor);
424+
if (UNLIKELY(term_is_invalid_term(dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx)))) {
425+
return term_invalid_term();
419426
}
420427

421428
break;
@@ -443,6 +450,54 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
443450
synclist_unlock(&conn_obj->remote_monitors);
444451
break;
445452
}
453+
case OPERATION_SPAWN_REQUEST: {
454+
if (UNLIKELY(arity != 6)) {
455+
RAISE_ERROR(BADARG_ATOM);
456+
}
457+
term roots[4];
458+
roots[0] = argv[0];
459+
roots[1] = argv[1];
460+
roots[2] = control;
461+
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
462+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
463+
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
464+
}
465+
control = roots[2];
466+
term arglist = roots[3];
467+
term mfa = term_get_tuple_element(control, 4);
468+
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
469+
RAISE_ERROR(BADARG_ATOM);
470+
}
471+
if (UNLIKELY(!term_is_list(arglist))) {
472+
RAISE_ERROR(BADARG_ATOM);
473+
}
474+
term reqid = term_get_tuple_element(control, 1);
475+
term from = term_get_tuple_element(control, 2);
476+
if (UNLIKELY(!term_is_pid(from))) {
477+
RAISE_ERROR(BADARG_ATOM);
478+
}
479+
// term groupleader = term_get_tuple_element(control, 3);
480+
// TODO: handle groupleader which is an externalpid
481+
term options = term_get_tuple_element(control, 5);
482+
483+
term request_tuple = term_alloc_tuple(4, &ctx->heap);
484+
term_put_tuple_element(request_tuple, 0, roots[0]);
485+
term_put_tuple_element(request_tuple, 1, reqid);
486+
term_put_tuple_element(request_tuple, 2, from);
487+
term_put_tuple_element(request_tuple, 3, options);
488+
term request_opt = term_alloc_tuple(2, &ctx->heap);
489+
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
490+
term_put_tuple_element(request_opt, 1, request_tuple);
491+
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);
492+
493+
// reuse roots for args
494+
roots[0] = term_get_tuple_element(mfa, 0);
495+
roots[1] = term_get_tuple_element(mfa, 1);
496+
roots[2] = arglist;
497+
roots[3] = spawn_opts;
498+
nif_erlang_spawn_opt(ctx, 4, roots);
499+
break;
500+
}
446501
default:
447502
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
448503
RAISE_ERROR(BADARG_ATOM);
@@ -468,6 +523,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
468523
synclist_unlock(&ctx->global->dist_connections);
469524
}
470525

526+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
527+
{
528+
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
529+
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
530+
// allocate tuple
531+
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
532+
term control_message = term_alloc_tuple(5, &heap);
533+
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
534+
term_put_tuple_element(control_message, 1, req_id);
535+
term_put_tuple_element(control_message, 2, to_pid);
536+
term_put_tuple_element(control_message, 3, term_from_int(flags));
537+
term_put_tuple_element(control_message, 4, result);
538+
539+
dist_enqueue_message(control_message, term_invalid_term(), connection, global);
540+
END_WITH_STACK_HEAP(heap, global)
541+
}
542+
471543
const struct Nif setnode_3_nif = {
472544
.base.type = NIFFunctionType,
473545
.nif_ptr = nif_erlang_setnode_3

src/libAtomVM/dist_nifs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
4141
extern const struct Nif dist_ctrl_get_data_nif;
4242
extern const struct Nif dist_ctrl_put_data_nif;
4343

44+
struct DistConnection;
45+
4446
void dist_send_message(term external_pid, term payload, Context *ctx);
4547

48+
/**
49+
* @doc Setup a monitor on a local process for a distributed process.
50+
* @end
51+
* @param conn_obj object of the connection
52+
* @param from_pid remote pid setting up the monitor
53+
* @param target_proc atom (for registered process) or pid of the local
54+
* process to monitor
55+
* @param monitor_ref reference used for monitor
56+
* @param ctx context for memory allocation
57+
*/
58+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);
59+
60+
/**
61+
* @doc Send a spawn reply signal to a node
62+
* @end
63+
* @param conn_obj object of the connection
64+
* @param req_id reference identifying the request
65+
* @param to_pid (remote) process id identifying the caller
66+
* @param link if a link was created
67+
* @param monitor if a monitor was created
68+
* @param result pid of the spawned process or atom for an error
69+
* @param ctx context for memory allocation
70+
*/
71+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);
72+
4673
#ifdef __cplusplus
4774
}
4875
#endif

0 commit comments

Comments
 (0)