Skip to content

Commit e8b18f7

Browse files
committed
Add support for asynchronous socket API for recv and recvfrom.
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 44191fc commit e8b18f7

File tree

18 files changed

+977
-254
lines changed

18 files changed

+977
-254
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
2525
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
2626
- Added `erl_epmd` client implementation to epmd using `socket` module
27+
- Added support for socket asynchronous API for `recv` and `recvfrom`.
2728

2829
### Changed
2930

libs/estdlib/src/gen_tcp_socket.erl

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,7 @@ handle_cast(_Request, State) ->
339339
{noreply, State}.
340340

341341
%% @hidden
342-
handle_info({select, _Socket, Ref, ready_input}, State) ->
343-
?LOG_DEBUG("handle_info [~p], ~p]", [
344-
{select, _Socket, Ref, ready_input}, State
345-
]),
342+
handle_info({'$socket', _Socket, select, Ref}, State) ->
346343
%% TODO cancel timer
347344
case maps:get(Ref, State#state.pending_selects, undefined) of
348345
undefined ->
@@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) ->
366363
pending_selects = maps:remove(Ref, State#state.pending_selects)
367364
}}
368365
end;
366+
handle_info({'$socket', Socket, abort, {Ref, closed}}, State) ->
367+
%% TODO cancel timer
368+
case maps:get(Ref, State#state.pending_selects, undefined) of
369+
undefined ->
370+
?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]),
371+
socket:nif_select_stop(Socket),
372+
{noreply, State};
373+
{accept, From, _AcceptingProc, _Timeout} ->
374+
socket:nif_select_stop(Socket),
375+
gen_server:reply(From, {error, closed}),
376+
{noreply, State};
377+
active ->
378+
WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE},
379+
State#state.controlling_process ! {tcp_closed, WrappedSocket},
380+
{noreply, State};
381+
{passive, From, _Length, _Timeout} ->
382+
socket:nif_select_stop(Socket),
383+
gen_server:reply(From, {error, closed}),
384+
{noreply, State#state{
385+
pending_selects = maps:remove(Ref, State#state.pending_selects)
386+
}}
387+
end;
369388
handle_info({timeout, Ref, From}, State) ->
370389
?LOG_DEBUG("handle_info [~p], ~p]", [
371390
{timeout, Ref, From}, State

libs/estdlib/src/gen_udp_socket.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ handle_cast(_Request, State) ->
242242
{noreply, State}.
243243

244244
%% @hidden
245-
handle_info({select, _Socket, Ref, ready_input}, State) ->
245+
handle_info({'$socket', _Socket, select, Ref}, State) ->
246246
case maps:get(Ref, State#state.pending_selects, undefined) of
247247
undefined ->
248248
?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]),

libs/estdlib/src/socket.erl

Lines changed: 156 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -245,22 +245,23 @@ accept(Socket, Timeout) ->
245245
case ?MODULE:nif_select_read(Socket, Ref) of
246246
ok ->
247247
receive
248-
{select, _AcceptedSocket, Ref, ready_input} ->
248+
{'$socket', Socket, select, Ref} ->
249249
case ?MODULE:nif_accept(Socket) of
250250
{error, closed} = E ->
251251
?MODULE:nif_select_stop(Socket),
252252
E;
253253
R ->
254254
R
255255
end;
256-
{closed, Ref} ->
256+
{'$socket', Socket, abort, {Ref, closed}} ->
257257
% socket was closed by another process
258258
% TODO: we need to handle:
259259
% (a) SELECT_STOP being scheduled
260-
% (b) flush of messages as we can have both
261-
% {closed, Ref} and {select, _, Ref, _} in the
260+
% (b) flush of messages as we can have both in the
262261
% queue
263-
{error, closed}
262+
{error, closed};
263+
Other ->
264+
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
264265
after Timeout ->
265266
{error, timeout}
266267
end;
@@ -299,25 +300,60 @@ recv(Socket, Length) ->
299300
%% `{ok, Data} = socket:recv(ConnectedSocket)'
300301
%% @end
301302
%%-----------------------------------------------------------------------------
302-
-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
303-
{ok, Data :: binary()} | {error, Reason :: term()}.
303+
-spec recv(
304+
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
305+
) ->
306+
{ok, Data :: binary()}
307+
| {select, {select_info, recvfrom, reference()}}
308+
| {select, {{select_info, recvfrom, reference()}, Data :: binary()}}
309+
| {error, Reason :: term()}.
310+
recv(Socket, Length, 0) ->
311+
recv0_noselect(Socket, Length);
312+
recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity ->
313+
recv0(Socket, 0, Timeout);
314+
recv(Socket, Length, nowait) ->
315+
recv0_nowait(Socket, Length, erlang:make_ref());
316+
recv(Socket, Length, Ref) when is_reference(Ref) ->
317+
recv0_nowait(Socket, Length, Ref);
304318
recv(Socket, Length, Timeout) ->
319+
case ?MODULE:getopt(Socket, {socket, type}) of
320+
{ok, stream} when Timeout =/= infinity ->
321+
recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []);
322+
{ok, stream} when Timeout =:= infinity ->
323+
recv0_r(Socket, Length, Timeout, undefined, []);
324+
_ ->
325+
recv0(Socket, Length, Timeout)
326+
end.
327+
328+
recv0_noselect(Socket, Length) ->
329+
case ?MODULE:nif_recv(Socket, Length) of
330+
{error, _} = E ->
331+
E;
332+
{ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length ->
333+
{ok, Data};
334+
{ok, Data} ->
335+
case ?MODULE:getopt(Socket, {socket, type}) of
336+
{ok, stream} ->
337+
{error, {timeout, Data}};
338+
{ok, dgram} ->
339+
{ok, Data}
340+
end
341+
end.
342+
343+
recv0(Socket, Length, Timeout) ->
305344
Ref = erlang:make_ref(),
306-
?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]),
307345
case ?MODULE:nif_select_read(Socket, Ref) of
308346
ok ->
309347
receive
310-
{select, _AcceptedSocket, Ref, ready_input} ->
348+
{'$socket', Socket, select, Ref} ->
311349
case ?MODULE:nif_recv(Socket, Length) of
312350
{error, _} = E ->
313351
?MODULE:nif_select_stop(Socket),
314352
E;
315-
% TODO: Assemble data to have more if Length > byte_size(Data)
316-
% as long as timeout did not expire
317353
{ok, Data} ->
318354
{ok, Data}
319355
end;
320-
{closed, Ref} ->
356+
{'$socket', Socket, abort, {Ref, closed}} ->
321357
% socket was closed by another process
322358
% TODO: see above in accept/2
323359
{error, closed}
@@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) ->
328364
Error
329365
end.
330366

367+
recv0_nowait(Socket, Length, Ref) ->
368+
case ?MODULE:nif_recv(Socket, Length) of
369+
{error, timeout} ->
370+
case ?MODULE:nif_select_read(Socket, Ref) of
371+
ok ->
372+
{select, {select_info, recv, Ref}};
373+
{error, _} = Error1 ->
374+
Error1
375+
end;
376+
{error, _} = E ->
377+
E;
378+
{ok, Data} when byte_size(Data) < Length ->
379+
case ?MODULE:getopt(Socket, {socket, type}) of
380+
{ok, stream} ->
381+
case ?MODULE:nif_select_read(Socket, Ref) of
382+
ok ->
383+
{select, {{select_info, recv, Ref}, Data}};
384+
{error, _} = Error1 ->
385+
Error1
386+
end;
387+
{ok, dgram} ->
388+
{ok, Data}
389+
end;
390+
{ok, Data} ->
391+
{ok, Data}
392+
end.
393+
394+
recv0_r(Socket, Length, Timeout, EndQuery, Acc) ->
395+
Ref = erlang:make_ref(),
396+
case ?MODULE:nif_select_read(Socket, Ref) of
397+
ok ->
398+
receive
399+
{'$socket', Socket, select, Ref} ->
400+
case ?MODULE:nif_recv(Socket, Length) of
401+
{error, _} = E ->
402+
?MODULE:nif_select_stop(Socket),
403+
E;
404+
{ok, Data} ->
405+
NewAcc = [Data | Acc],
406+
Remaining = Length - byte_size(Data),
407+
case Remaining of
408+
0 ->
409+
{ok, list_to_binary(lists:reverse(NewAcc))};
410+
_ ->
411+
NewTimeout =
412+
case Timeout of
413+
infinity -> infinity;
414+
_ -> EndQuery - erlang:system_time(millisecond)
415+
end,
416+
recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc)
417+
end
418+
end;
419+
{'$socket', Socket, abort, {Ref, closed}} ->
420+
% socket was closed by another process
421+
% TODO: see above in accept/2
422+
{error, closed}
423+
after Timeout ->
424+
case Acc of
425+
[] -> {error, timeout};
426+
_ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}}
427+
end
428+
end;
429+
{error, _Reason} = Error ->
430+
Error
431+
end.
432+
331433
%%-----------------------------------------------------------------------------
332434
%% @equiv socket:recvfrom(Socket, 0)
333435
%% @end
@@ -370,25 +472,43 @@ recvfrom(Socket, Length) ->
370472
%% bytes are available and return these bytes.
371473
%% @end
372474
%%-----------------------------------------------------------------------------
373-
-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
374-
{ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}.
475+
-spec recvfrom(
476+
Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
477+
) ->
478+
{ok, {Address :: sockaddr(), Data :: binary()}}
479+
| {select, {select_info, recvfrom, reference()}}
480+
| {error, Reason :: term()}.
481+
recvfrom(Socket, Length, 0) ->
482+
recvfrom0_noselect(Socket, Length);
483+
recvfrom(Socket, Length, nowait) ->
484+
recvfrom0_nowait(Socket, Length, erlang:make_ref());
485+
recvfrom(Socket, Length, Ref) when is_reference(Ref) ->
486+
recvfrom0_nowait(Socket, Length, Ref);
375487
recvfrom(Socket, Length, Timeout) ->
488+
recvfrom0(Socket, Length, Timeout).
489+
490+
recvfrom0_noselect(Socket, Length) ->
491+
case ?MODULE:nif_recvfrom(Socket, Length) of
492+
{error, _} = E ->
493+
E;
494+
{ok, {_Address, _Data}} = Reply ->
495+
Reply
496+
end.
497+
498+
recvfrom0(Socket, Length, Timeout) ->
376499
Ref = erlang:make_ref(),
377-
?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]),
378500
case ?MODULE:nif_select_read(Socket, Ref) of
379501
ok ->
380502
receive
381-
{select, _AcceptedSocket, Ref, ready_input} ->
503+
{'$socket', Socket, select, Ref} ->
382504
case ?MODULE:nif_recvfrom(Socket, Length) of
383505
{error, _} = E ->
384506
?MODULE:nif_select_stop(Socket),
385507
E;
386-
% TODO: Assemble data to have more if Length > byte_size(Data)
387-
% as long as timeout did not expire
388-
{ok, {Address, Data}} ->
389-
{ok, {Address, Data}}
508+
{ok, {_Address, _Data}} = Reply ->
509+
Reply
390510
end;
391-
{closed, Ref} ->
511+
{'$socket', Socket, abort, {Ref, closed}} ->
392512
% socket was closed by another process
393513
% TODO: see above in accept/2
394514
{error, closed}
@@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
399519
Error
400520
end.
401521

522+
recvfrom0_nowait(Socket, Length, Ref) ->
523+
case ?MODULE:nif_recvfrom(Socket, Length) of
524+
{error, timeout} ->
525+
case ?MODULE:nif_select_read(Socket, Ref) of
526+
ok ->
527+
{select, {select_info, recvfrom, Ref}};
528+
{error, _} = SelectError ->
529+
SelectError
530+
end;
531+
{error, _} = RecvError ->
532+
RecvError;
533+
{ok, {_Address, _Data}} = Reply ->
534+
Reply
535+
end.
536+
402537
%%-----------------------------------------------------------------------------
403538
%% @param Socket the socket
404539
%% @param Data the data to send

libs/estdlib/src/ssl.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) ->
189189
case socket:nif_select_read(Socket, Ref) of
190190
ok ->
191191
receive
192-
{select, _SocketResource, Ref, ready_input} ->
192+
{'$socket', Socket, select, Ref} ->
193193
handshake_loop(SSLContext, Socket);
194-
{closed, Ref} ->
194+
{'$socket', Socket, abort, {Ref, closed}} ->
195195
ok = socket:close(Socket),
196196
{error, closed}
197197
end;
@@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) ->
242242
case socket:nif_select_read(Socket, Ref) of
243243
ok ->
244244
receive
245-
{select, _SocketResource, Ref, ready_input} ->
245+
{'$socket', Socket, select, Ref} ->
246246
close_notify_loop(SSLContext, Socket);
247-
{closed, Ref} ->
247+
{'$socket', Socket, abort, {Ref, closed}} ->
248248
ok = socket:close(Socket),
249249
{error, closed}
250250
end;
@@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) ->
274274
case socket:nif_select_read(Socket, Ref) of
275275
ok ->
276276
receive
277-
{select, _SocketResource, Ref, ready_input} ->
277+
{'$socket', Socket, select, Ref} ->
278278
send(SSLSocket, Binary);
279-
{closed, Ref} ->
279+
{'$socket', Socket, abort, {Ref, closed}} ->
280280
{error, closed}
281281
end;
282282
{error, _Reason} = Error ->
@@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) ->
309309
case socket:nif_select_read(Socket, Ref) of
310310
ok ->
311311
receive
312-
{select, _SocketResource, Ref, ready_input} ->
312+
{'$socket', Socket, select, Ref} ->
313313
recv0(SSLSocket, Length, Remaining, Acc);
314-
{closed, Ref} ->
314+
{'$socket', Socket, abort, {Ref, closed}} ->
315315
{error, closed}
316316
end;
317317
{error, _Reason} = Error ->

src/libAtomVM/defaultatoms.def

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,9 @@ X(SHUTDOWN_ATOM, "\x8", "shutdown")
170170

171171
X(NONODE_AT_NOHOST_ATOM, "\xD", "nonode@nohost")
172172
X(NET_KERNEL_ATOM, "\xA", "net_kernel")
173+
174+
X(DOLLAR_SOCKET_ATOM, "\x7", "$socket")
175+
X(ABORT_ATOM, "\x5", "abort")
176+
X(FAMILY_ATOM, "\x6", "family")
177+
X(INET_ATOM, "\x4", "inet")
178+
X(TIMEOUT_ATOM, "\x7", "timeout")

src/libAtomVM/erl_nif.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
215215
*/
216216
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);
217217

218+
/**
219+
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
220+
*
221+
* @param env current environment
222+
* @param event event object (typically a file descriptor)
223+
* @param obj resource object working as a container of the event object.
224+
* @param pid process id to send a message to or NULL to use the current process (from `env`)
225+
* @param msg message to send (copied).
226+
* @param msg_env must be NULL.
227+
* @return a negative value on failure, 0 or flags on success.
228+
*/
229+
int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);
230+
231+
/**
232+
* @brief Variant of `enif_select` where sent message is `msg` instead of default.
233+
*
234+
* @param env current environment
235+
* @param event event object (typically a file descriptor)
236+
* @param obj resource object working as a container of the event object.
237+
* @param pid process id to send a message to or NULL to use the current process (from `env`)
238+
* @param msg message to send (copied).
239+
* @param msg_env must be NULL.
240+
* @return a negative value on failure, 0 or flags on success.
241+
*/
242+
int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);
243+
218244
/**
219245
* @brief Monitor a process by using a resource object.
220246
* @details The monitor is automatically removed after being triggered or if the

0 commit comments

Comments
 (0)