Skip to content

Commit ec336e4

Browse files
committed
Merge pull request #1468 from pguyot/w02/erlang-distribution-03
Distribution: add support for autoconnect Allow connections from AtomVM to other nodes (OTP or AtomVM). These changes are made under both the "Apache 2.0" and the "GNU Lesser General Public License 2.1 or later" license terms (dual license). SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
2 parents 41d90c8 + 47bbff1 commit ec336e4

File tree

11 files changed

+339
-37
lines changed

11 files changed

+339
-37
lines changed

libs/estdlib/src/dist_util.erl

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,27 @@ handshake_other_started(#hs_data{socket = Socket, f_recv = Recv} = HSData0) ->
229229
end.
230230

231231
-spec handshake_we_started(#hs_data{}) -> no_return().
232-
handshake_we_started(#hs_data{}) -> ok.
232+
handshake_we_started(#hs_data{} = HSData0) ->
233+
HSData1 = HSData0#hs_data{
234+
other_started = false,
235+
this_flags = ?MANDATORY_DFLAGS
236+
},
237+
send_name(HSData1),
238+
case recv_status(HSData1) of
239+
<<"ok">> -> ok;
240+
<<"ok_simultaneous">> -> ok;
241+
<<"nok">> -> ?shutdown({HSData1#hs_data.other_node, simultaneous});
242+
<<"alive">> -> send_status(<<"true">>, HSData1);
243+
Other -> ?shutdown({HSData1#hs_data.other_node, {unexpected, Other}})
244+
end,
245+
Cookie = net_kernel:get_cookie(HSData1#hs_data.other_node),
246+
{OtherChallenge, OtherFlags, Creation} = recv_challenge(HSData1),
247+
check_flags(OtherFlags, HSData1),
248+
<<MyChallenge:32>> = crypto:strong_rand_bytes(4),
249+
send_challenge_reply(Cookie, OtherChallenge, MyChallenge, HSData1),
250+
OtherDigest = recv_challenge_ack(HSData1),
251+
check_challenge(Cookie, MyChallenge, OtherDigest, HSData1),
252+
connection(HSData1, Creation).
233253

234254
% We are connected
235255
-spec connection(#hs_data{}, non_neg_integer()) -> no_return().
@@ -359,6 +379,20 @@ check_flags(Flags0, HSData) ->
359379
?shutdown(Reason)
360380
end.
361381

382+
% send name
383+
send_name(
384+
#hs_data{socket = Socket, f_send = Send, this_node = ThisNode, this_flags = ThisFlags} = HSData
385+
) ->
386+
Creation = atomvm:get_creation(),
387+
NodeName = atom_to_binary(ThisNode, latin1),
388+
NameLen = byte_size(NodeName),
389+
case Send(Socket, <<$N, ThisFlags:64, Creation:32, NameLen:16, NodeName/binary>>) of
390+
{error, _} = Error ->
391+
?shutdown2({HSData#hs_data.other_node, Socket}, {send_name_failed, Error});
392+
ok ->
393+
ok
394+
end.
395+
362396
% Ensure name is somewhat valid
363397
-spec check_name(binary()) -> ok.
364398
check_name(Name) ->
@@ -378,13 +412,13 @@ send_status(Status, #hs_data{socket = Socket, f_send = Send} = HSData) ->
378412
ok
379413
end.
380414

381-
-spec recv_status_reply(#hs_data{}) -> binary().
382-
recv_status_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
415+
-spec recv_status(#hs_data{}) -> binary().
416+
recv_status(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
383417
case Recv(Socket, 0, infinity) of
384418
{ok, <<$s, Result/binary>>} ->
385419
Result;
386420
{ok, Other} ->
387-
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status_reply, Other}});
421+
?shutdown({HSData#hs_data.other_node, {unexpected, recv_status, Other}});
388422
{error, Reason} ->
389423
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
390424
end.
@@ -403,7 +437,7 @@ mark_pending(#hs_data{kernel_pid = Kernel, this_node = ThisNode, other_node = Ot
403437
alive ->
404438
send_status(<<"alive">>, HSData),
405439
reset_timer(HSData#hs_data.timer),
406-
case recv_status_reply(HSData) of
440+
case recv_status(HSData) of
407441
<<"true">> -> ok;
408442
<<"false">> -> ?shutdown(OtherNode);
409443
Other -> ?shutdown({OtherNode, {unexpected, Other}})
@@ -434,6 +468,28 @@ send_challenge(
434468
ok
435469
end.
436470

471+
recv_challenge(
472+
#hs_data{other_node = OtherNode, socket = Socket, f_recv = Recv} = HSData
473+
) ->
474+
case Recv(Socket, 0, infinity) of
475+
{ok, <<
476+
$N, OtherFlags:64, Challenge:32, OtherCreation:32, _OtherNameLen:16, OtherName/binary
477+
>>} ->
478+
case atom_to_binary(OtherNode, utf8) =/= OtherName of
479+
true ->
480+
?shutdown({
481+
HSData#hs_data.other_node, {mismatch, recv_challenge, OtherNode, OtherName}
482+
});
483+
false ->
484+
ok
485+
end,
486+
{Challenge, OtherFlags, OtherCreation};
487+
{ok, Other} ->
488+
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge, Other}});
489+
{error, Reason} ->
490+
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
491+
end.
492+
437493
-spec recv_challenge_reply(#hs_data{}) -> {non_neg_integer(), binary()}.
438494
recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
439495
case Recv(Socket, 0, infinity) of
@@ -445,6 +501,23 @@ recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
445501
?shutdown2({HSData#hs_data.other_node, recv_error}, Reason)
446502
end.
447503

504+
-spec send_challenge_reply(
505+
Cookie :: binary(),
506+
OtherChallenge :: non_neg_integer(),
507+
MyChallenge :: non_neg_integer(),
508+
#hs_data{}
509+
) -> ok.
510+
send_challenge_reply(
511+
Cookie, OtherChallenge, MyChallenge, #hs_data{socket = Socket, f_send = Send} = HSData
512+
) ->
513+
Digest = gen_digest(Cookie, OtherChallenge),
514+
case Send(Socket, <<$r, MyChallenge:32, Digest:16/binary>>) of
515+
{error, _} = Error ->
516+
?shutdown2({HSData#hs_data.other_node, Socket}, {send_challenge_reply_failed, Error});
517+
ok ->
518+
ok
519+
end.
520+
448521
-spec check_challenge(
449522
Cookie :: binary(), Challenge :: non_neg_integer(), Digest :: binary(), #hs_data{}
450523
) -> ok.
@@ -470,6 +543,17 @@ send_challenge_ack(Cookie, Challenge, #hs_data{socket = Socket, f_send = Send} =
470543
ok
471544
end.
472545

546+
-spec recv_challenge_ack(#hs_data{}) -> binary().
547+
recv_challenge_ack(#hs_data{socket = Socket, f_recv = Recv} = HSData) ->
548+
case Recv(Socket, 0, infinity) of
549+
{ok, <<$a, Digest/binary>>} ->
550+
Digest;
551+
{ok, Other} ->
552+
?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge_ack, Other}});
553+
{error, _} = Error ->
554+
?shutdown2({HSData#hs_data.other_node, Socket}, {recv_challenge_ack, Error})
555+
end.
556+
473557
-spec shutdown(atom(), non_neg_integer(), term()) -> no_return().
474558
shutdown(Module, Line, Data) ->
475559
shutdown(Module, Line, Data, shutdown).

libs/estdlib/src/net_kernel.erl

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,17 @@ handle_call(
218218
case maps:find(OtherNode, Connections) of
219219
error ->
220220
{reply, ok, State0#state{
221-
connections = maps:put(OtherNode, {pending, ConnPid}, Connections)
221+
connections = maps:put(OtherNode, {pending, ConnPid, undefined}, Connections)
222222
}};
223-
{ok, {pending, OtherConnPid}} when OtherNode > ThisNode ->
223+
{ok, {pending, undefined, DHandle}} ->
224+
{reply, ok, State0#state{
225+
connections = maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections)
226+
}};
227+
{ok, {pending, OtherConnPid, DHandle}} when OtherNode > ThisNode ->
224228
{reply, {ok_simultaneous, OtherConnPid}, State0#state{
225-
connections = maps:update(OtherNode, {pending, ConnPid}, Connections)
229+
connections = maps:update(OtherNode, {pending, ConnPid, DHandle}, Connections)
226230
}};
227-
{ok, {pending, _OtherConnPid}} ->
231+
{ok, {pending, _OtherConnPid, _DHandle}} ->
228232
{reply, nok, State0};
229233
{ok, {alive, _ConnPid, _Address}} ->
230234
{reply, alive, State0}
@@ -296,12 +300,29 @@ handle_info({'EXIT', Pid, _Reason}, #state{connections = Connections} = State) -
296300
fun(_Node, Status) ->
297301
case Status of
298302
{alive, Pid, _Address} -> false;
299-
{pending, Pid} -> false;
303+
{pending, Pid, _DHandle} -> false;
300304
_ -> true
301305
end
302306
end,
303307
Connections
304308
),
309+
{noreply, State#state{connections = NewConnections}};
310+
handle_info(
311+
{connect, OtherNode, DHandle},
312+
#state{connections = Connections, node = MyNode, longnames = Longnames, proto_dist = ProtoDist} =
313+
State
314+
) ->
315+
% ensure DHandle is not garbage collected until setup failed or succeeded
316+
NewConnections =
317+
case maps:find(OtherNode, Connections) of
318+
error ->
319+
ProtoDist:setup(OtherNode, normal, MyNode, Longnames, ?SETUPTIME),
320+
maps:put(OtherNode, {pending, undefined, DHandle}, Connections);
321+
{ok, {pending, ConnPid, _}} ->
322+
maps:put(OtherNode, {pending, ConnPid, DHandle}, Connections);
323+
{ok, {alive, _ConnPid, _Address}} ->
324+
Connections
325+
end,
305326
{noreply, State#state{connections = NewConnections}}.
306327

307328
%% @hidden

libs/estdlib/src/socket_dist.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ do_setup(Kernel, Node, Type, MyNode, _LongOrShortNames, SetupTime) ->
145145
of
146146
ok ->
147147
{ok, DistController} = socket_dist_controller:start(Sock),
148+
true = socket_dist_controller:supervisor(
149+
DistController, self()
150+
),
148151
HSData = hs_data(
149152
Kernel,
150153
MyNode,

src/libAtomVM/defaultatoms.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,4 @@ X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181181
X(REQUEST_ATOM, "\x7", "request")
182+
X(CONNECT_ATOM, "\x7", "connect")

0 commit comments

Comments
 (0)