Skip to content

Commit 09fb68b

Browse files
committed
Distribution: add support for rpc from other nodes
- Also add epmd_disterl esp32 example Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 6da1e93 commit 09fb68b

File tree

11 files changed

+315
-38
lines changed

11 files changed

+315
-38
lines changed

examples/erlang/esp32/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@ pack_runnable(sx127x sx127x eavmlib estdlib)
3737
pack_runnable(reformat_nvs reformat_nvs eavmlib)
3838
pack_runnable(uartecho uartecho eavmlib estdlib)
3939
pack_runnable(ledc_example ledc_example eavmlib estdlib)
40+
pack_runnable(epmd_disterl epmd_disterl eavmlib estdlib)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
-module(epmd_disterl).
22+
23+
-export([start/0]).
24+
25+
start() ->
26+
Creds = [
27+
{ssid, "myssid"},
28+
{psk, "mypsk"}
29+
],
30+
case network:wait_for_sta(Creds, 30000) of
31+
{ok, {Address, _Netmask, _Gateway}} ->
32+
distribution_start(Address);
33+
Error ->
34+
io:format("An error occurred starting network: ~p~n", [Error])
35+
end.
36+
37+
distribution_start(Address) ->
38+
{ok, _EPMDPid} = epmd:start_link([]),
39+
{ok, _KernelPid} = kernel:start(normal, []),
40+
{X, Y, Z, T} = Address,
41+
Node = list_to_atom(lists:flatten(io_lib:format("atomvm@~B.~B.~B.~B", [X, Y, Z, T]))),
42+
{ok, _NetKernelPid} = net_kernel:start(Node, #{name_domain => longnames}),
43+
io:format("Distribution was started\n"),
44+
io:format("Node is ~p\n", [node()]),
45+
net_kernel:set_cookie(<<"AtomVM">>),
46+
io:format("Cookie is ~s\n", [net_kernel:get_cookie()]),
47+
register(disterl, self()),
48+
io:format(
49+
"This AtomVM node is waiting for 'quit' message, and this process is registered as 'disterl'\n"
50+
),
51+
io:format("On an OTP node with long names distribution, run:\n"),
52+
io:format("erlang:set_cookie('~s', 'AtomVM').\n", [Node]),
53+
io:format("{disterl, '~s'} ! quit.\n", [Node]),
54+
receive
55+
quit -> ok
56+
end.

libs/estdlib/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(ERLANG_MODULES
3131
crypto
3232
dist_util
3333
erl_epmd
34+
erpc
3435
erts_debug
3536
ets
3637
gen_event

libs/estdlib/src/erpc.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
%%-----------------------------------------------------------------------------
22+
%% @doc An implementation of the Erlang/OTP erpc interface.
23+
%%
24+
%% This module implements a strict subset of the Erlang/OTP erpc
25+
%% interface.
26+
%% @end
27+
%%-----------------------------------------------------------------------------
28+
-module(erpc).
29+
30+
% api
31+
-export([
32+
execute_call/4
33+
]).
34+
35+
%%-----------------------------------------------------------------------------
36+
%% @param Reference reference of the request, passed in exit tuple
37+
%% @param Module module to call
38+
%% @param Func function to call
39+
%% @param Args argument of the call
40+
%% @doc Execute a call locally, exiting with the result.
41+
%% This function is called from rpc on other nodes using spawn_request BIF.
42+
%% @end
43+
%%-----------------------------------------------------------------------------
44+
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
45+
no_return().
46+
execute_call(Reference, Module, Func, Args) ->
47+
Reply =
48+
try
49+
Result = apply(Module, Func, Args),
50+
{Reference, return, Result}
51+
catch
52+
throw:Reason ->
53+
{Reference, throw, Reason};
54+
exit:Reason ->
55+
{Reference, exit, Reason};
56+
error:Reason:Stack ->
57+
{Reference, error, Reason, Stack}
58+
end,
59+
exit(Reply).

src/libAtomVM/defaultatoms.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,4 @@ X(INET_ATOM, "\x4", "inet")
178178
X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181+
X(REQUEST_ATOM, "\x7", "request")

src/libAtomVM/dist_nifs.c

Lines changed: 107 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ enum
7070
OPERATION_ALIAS_SEND_TT = 34,
7171
};
7272

73+
enum
74+
{
75+
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
76+
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
77+
};
78+
7379
struct DistributionPacket
7480
{
7581
struct ListHead head;
@@ -129,7 +135,7 @@ static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj)
129135

130136
static void dist_enqueue_message(term control_message, term payload, struct DistConnection *connection, GlobalContext *global)
131137
{
132-
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 is not smart enough
138+
size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 are not smart enough
133139
enum ExternalTermResult serialize_result = externalterm_compute_external_size(control_message, &control_message_size, global);
134140
if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) {
135141
size_t payload_size = 0;
@@ -195,10 +201,7 @@ static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pi
195201

196202
struct DistConnection *conn_obj = (struct DistConnection *) obj;
197203

198-
if (UNLIKELY(enif_compare_monitors(&conn_obj->connection_process_monitor, mon) == 0)) {
199-
struct RefcBinary *rsrc_refc = refc_binary_from_data(obj);
200-
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
201-
} else {
204+
if (enif_compare_monitors(&conn_obj->connection_process_monitor, mon) != 0) {
202205
struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors);
203206
struct ListHead *item;
204207
LIST_FOR_EACH (item, remote_monitors) {
@@ -280,10 +283,6 @@ static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[])
280283
list_prepend(dist_connections, &conn_obj->head);
281284
synclist_unlock(&ctx->global->dist_connections);
282285

283-
// Increment reference count as the resource should be alive until controller process dies
284-
struct RefcBinary *rsrc_refc = refc_binary_from_data(conn_obj);
285-
refc_binary_increment_refcount(rsrc_refc);
286-
287286
if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
288287
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
289288
}
@@ -339,6 +338,38 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
339338
return result;
340339
}
341340

341+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
342+
{
343+
if (term_is_atom(target_proc)) {
344+
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
345+
}
346+
int target_process_id = 0;
347+
if (term_is_local_pid(target_proc)) {
348+
target_process_id = term_to_local_process_id(target_proc);
349+
} else {
350+
RAISE_ERROR(BADARG_ATOM);
351+
}
352+
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
353+
monitor->target_proc = target_proc;
354+
monitor->pid_number = term_get_external_pid_process_id(from_pid);
355+
monitor->pid_serial = term_get_external_pid_serial(from_pid);
356+
monitor->ref_len = term_get_external_reference_len(monitor_ref);
357+
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
358+
if (target_process_id) {
359+
synclist_append(&conn_obj->remote_monitors, &monitor->head);
360+
ErlNifPid target_process_pid = target_process_id;
361+
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
362+
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
363+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
364+
free(monitor);
365+
}
366+
} else {
367+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
368+
free(monitor);
369+
}
370+
return OK_ATOM;
371+
}
372+
342373
static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
343374
{
344375
UNUSED(argc);
@@ -390,32 +421,8 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
390421
term from_pid = term_get_tuple_element(control, 1);
391422
term target_proc = term_get_tuple_element(control, 2);
392423
term monitor_ref = term_get_tuple_element(control, 3);
393-
if (term_is_atom(target_proc)) {
394-
target_proc = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
395-
}
396-
int target_process_id = 0;
397-
if (term_is_local_pid(target_proc)) {
398-
target_process_id = term_to_local_process_id(target_proc);
399-
} else {
400-
RAISE_ERROR(BADARG_ATOM);
401-
}
402-
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
403-
monitor->target_proc = target_proc;
404-
monitor->pid_number = term_get_external_pid_process_id(from_pid);
405-
monitor->pid_serial = term_get_external_pid_serial(from_pid);
406-
monitor->ref_len = term_get_external_reference_len(monitor_ref);
407-
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
408-
if (target_process_id) {
409-
synclist_append(&conn_obj->remote_monitors, &monitor->head);
410-
ErlNifPid target_process_pid = target_process_id;
411-
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
412-
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
413-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
414-
free(monitor);
415-
}
416-
} else {
417-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
418-
free(monitor);
424+
if (UNLIKELY(term_is_invalid_term(dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx)))) {
425+
return term_invalid_term();
419426
}
420427

421428
break;
@@ -443,6 +450,54 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
443450
synclist_unlock(&conn_obj->remote_monitors);
444451
break;
445452
}
453+
case OPERATION_SPAWN_REQUEST: {
454+
if (UNLIKELY(arity != 6)) {
455+
RAISE_ERROR(BADARG_ATOM);
456+
}
457+
term roots[4];
458+
roots[0] = argv[0];
459+
roots[1] = argv[1];
460+
roots[2] = control;
461+
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
462+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
463+
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
464+
}
465+
control = roots[2];
466+
term arglist = roots[3];
467+
term mfa = term_get_tuple_element(control, 4);
468+
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
469+
RAISE_ERROR(BADARG_ATOM);
470+
}
471+
if (UNLIKELY(!term_is_list(arglist))) {
472+
RAISE_ERROR(BADARG_ATOM);
473+
}
474+
term reqid = term_get_tuple_element(control, 1);
475+
term from = term_get_tuple_element(control, 2);
476+
if (UNLIKELY(!term_is_pid(from))) {
477+
RAISE_ERROR(BADARG_ATOM);
478+
}
479+
// term groupleader = term_get_tuple_element(control, 3);
480+
// TODO: handle groupleader which is an externalpid
481+
term options = term_get_tuple_element(control, 5);
482+
483+
term request_tuple = term_alloc_tuple(4, &ctx->heap);
484+
term_put_tuple_element(request_tuple, 0, roots[0]);
485+
term_put_tuple_element(request_tuple, 1, reqid);
486+
term_put_tuple_element(request_tuple, 2, from);
487+
term_put_tuple_element(request_tuple, 3, options);
488+
term request_opt = term_alloc_tuple(2, &ctx->heap);
489+
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
490+
term_put_tuple_element(request_opt, 1, request_tuple);
491+
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);
492+
493+
// reuse roots for args
494+
roots[0] = term_get_tuple_element(mfa, 0);
495+
roots[1] = term_get_tuple_element(mfa, 1);
496+
roots[2] = arglist;
497+
roots[3] = spawn_opts;
498+
nif_erlang_spawn_opt(ctx, 4, roots);
499+
break;
500+
}
446501
default:
447502
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
448503
RAISE_ERROR(BADARG_ATOM);
@@ -468,6 +523,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
468523
synclist_unlock(&ctx->global->dist_connections);
469524
}
470525

526+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
527+
{
528+
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
529+
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
530+
// allocate tuple
531+
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
532+
term control_message = term_alloc_tuple(5, &heap);
533+
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
534+
term_put_tuple_element(control_message, 1, req_id);
535+
term_put_tuple_element(control_message, 2, to_pid);
536+
term_put_tuple_element(control_message, 3, term_from_int(flags));
537+
term_put_tuple_element(control_message, 4, result);
538+
539+
dist_enqueue_message(control_message, term_invalid_term(), connection, global);
540+
END_WITH_STACK_HEAP(heap, global)
541+
}
542+
471543
const struct Nif setnode_3_nif = {
472544
.base.type = NIFFunctionType,
473545
.nif_ptr = nif_erlang_setnode_3

src/libAtomVM/dist_nifs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
4141
extern const struct Nif dist_ctrl_get_data_nif;
4242
extern const struct Nif dist_ctrl_put_data_nif;
4343

44+
struct DistConnection;
45+
4446
void dist_send_message(term external_pid, term payload, Context *ctx);
4547

48+
/**
49+
* @doc Setup a monitor on a local process for a distributed process.
50+
* @end
51+
* @param conn_obj object of the connection
52+
* @param from_pid remote pid setting up the monitor
53+
* @param target_proc atom (for registered process) or pid of the local
54+
* process to monitor
55+
* @param monitor_ref reference used for monitor
56+
* @param ctx context for memory allocation
57+
*/
58+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);
59+
60+
/**
61+
* @doc Send a spawn reply signal to a node
62+
* @end
63+
* @param conn_obj object of the connection
64+
* @param req_id reference identifying the request
65+
* @param to_pid (remote) process id identifying the caller
66+
* @param link if a link was created
67+
* @param monitor if a monitor was created
68+
* @param result pid of the spawned process or atom for an error
69+
* @param ctx context for memory allocation
70+
*/
71+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);
72+
4673
#ifdef __cplusplus
4774
}
4875
#endif

0 commit comments

Comments
 (0)