Skip to content

Commit a6add4b

Browse files
committed
Add support for asynchronous socket:accept/2 API
- Also move reference count logic related to monitor to resources.c - Also remove unnecessary stop handler for BSD sockets as demonitor is handled elsewhere, and stop handler can be called from a platform select loop, creating possible race conditions. Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 095e61a commit a6add4b

File tree

9 files changed

+165
-98
lines changed

9 files changed

+165
-98
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
2525
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
2626
- Added `erl_epmd` client implementation to epmd using `socket` module
27-
- Added support for socket asynchronous API for `recv` and `recvfrom`.
27+
- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`.
2828

2929
### Changed
3030

libs/estdlib/src/socket.erl

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -229,46 +229,77 @@ accept(Socket) ->
229229
%% be set to listen for connections.
230230
%%
231231
%% Note that this function will block until a connection is made
232-
%% from a client. Typically, users will spawn a call to `accept'
233-
%% in a separate process.
232+
%% from a client, unless `nowait' or a reference is passed as `Timeout'.
233+
%% Typically, users will spawn a call to `accept' in a separate process.
234234
%%
235235
%% Example:
236236
%%
237237
%% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)'
238238
%% @end
239239
%%-----------------------------------------------------------------------------
240-
-spec accept(Socket :: socket(), Timeout :: timeout()) ->
241-
{ok, Connection :: socket()} | {error, Reason :: term()}.
240+
-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) ->
241+
{ok, Connection :: socket()}
242+
| {select, {select_info, accept, reference()}}
243+
| {error, Reason :: term()}.
244+
accept(Socket, 0) ->
245+
accept0_noselect(Socket);
246+
accept(Socket, nowait) ->
247+
accept0_nowait(Socket, erlang:make_ref());
248+
accept(Socket, Ref) when is_reference(Ref) ->
249+
accept0_nowait(Socket, Ref);
242250
accept(Socket, Timeout) ->
251+
accept0(Socket, Timeout).
252+
253+
accept0_noselect(Socket) ->
254+
case ?MODULE:nif_accept(Socket) of
255+
{error, _} = E ->
256+
E;
257+
{ok, _Socket} = Reply ->
258+
Reply
259+
end.
260+
261+
accept0(Socket, Timeout) ->
243262
Ref = erlang:make_ref(),
244-
?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]),
245263
case ?MODULE:nif_select_read(Socket, Ref) of
246264
ok ->
247265
receive
248266
{'$socket', Socket, select, Ref} ->
249267
case ?MODULE:nif_accept(Socket) of
250-
{error, closed} = E ->
268+
{error, _} = E ->
251269
?MODULE:nif_select_stop(Socket),
252270
E;
253-
R ->
254-
R
271+
{ok, _Socket} = Reply ->
272+
Reply
255273
end;
256274
{'$socket', Socket, abort, {Ref, closed}} ->
257275
% socket was closed by another process
258276
% TODO: we need to handle:
259277
% (a) SELECT_STOP being scheduled
260278
% (b) flush of messages as we can have both in the
261279
% queue
262-
{error, closed};
263-
Other ->
264-
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
280+
{error, closed}
265281
after Timeout ->
266282
{error, timeout}
267283
end;
268284
{error, _Reason} = Error ->
269285
Error
270286
end.
271287

288+
accept0_nowait(Socket, Ref) ->
289+
case ?MODULE:nif_accept(Socket) of
290+
{error, eagain} ->
291+
case ?MODULE:nif_select_read(Socket, Ref) of
292+
ok ->
293+
{select, {select_info, accept, Ref}};
294+
{error, _} = SelectError ->
295+
SelectError
296+
end;
297+
{error, _} = RecvError ->
298+
RecvError;
299+
{ok, _Socket} = Reply ->
300+
Reply
301+
end.
302+
272303
%%-----------------------------------------------------------------------------
273304
%% @equiv socket:recv(Socket, 0)
274305
%% @end

src/libAtomVM/context.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ void context_destroy(Context *ctx)
157157
void *resource = term_to_term_ptr(monitor->monitor_obj);
158158
struct RefcBinary *refc = refc_binary_from_data(resource);
159159
refc->resource_type->down(erl_nif_env_from_context(ctx), resource, &ctx->process_id, &monitor->ref_ticks);
160+
refc_binary_decrement_refcount(refc, ctx->global);
160161
free(monitor);
161162
}
162163
}

src/libAtomVM/otp_socket.c

Lines changed: 20 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -280,26 +280,6 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj)
280280
#endif
281281
}
282282

283-
#if OTP_SOCKET_BSD
284-
static void socket_stop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call)
285-
{
286-
UNUSED(caller_env);
287-
UNUSED(event);
288-
UNUSED(is_direct_call);
289-
290-
struct SocketResource *rsrc_obj = (struct SocketResource *) obj;
291-
292-
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
293-
enif_demonitor_process(caller_env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
294-
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
295-
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
296-
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
297-
}
298-
299-
TRACE("socket_stop called on fd=%i\n", rsrc_obj->fd);
300-
}
301-
#endif
302-
303283
static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon)
304284
{
305285
UNUSED(caller_env);
@@ -314,24 +294,20 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
314294
TRACE("socket_down called on process_id=%i\n", (int) *pid);
315295
#endif
316296

317-
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
318297
SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
319298

320299
if (rsrc_obj->selecting_process_id == INVALID_PROCESS_ID) {
321300
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
322301
return;
323302
}
303+
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
324304

325305
#if OTP_SOCKET_BSD
326-
// Monitor fired, so make sure we don't try to demonitor in select_stop
327-
// as it could crash trying to reacquire lock on process table
328-
// enif_select can decrement ref count but it's at least 2 in this case (1 for monitor and 1 for select)
329-
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
306+
// enif_select can decrement ref count but it's at least 2 here (1 for monitor and 1 for select)
330307
enif_select(caller_env, rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
331308
#elif OTP_SOCKET_LWIP
332309
// Monitor can be called when we're selecting, accepting or connecting.
333310
LWIP_BEGIN();
334-
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
335311
if (rsrc_obj->socket_state & SocketStateTCP) {
336312
if (rsrc_obj->socket_state & SocketStateTCPListening) {
337313
(void) tcp_close(rsrc_obj->tcp_pcb);
@@ -347,21 +323,13 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
347323
}
348324
LWIP_END();
349325
#endif
350-
351326
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
352-
353-
// We're no longer monitoring so we can decrement ref count
354-
refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
355327
}
356328

357329
static const ErlNifResourceTypeInit SocketResourceTypeInit = {
358330
.members = 3,
359331
.dtor = socket_dtor,
360-
#if OTP_SOCKET_BSD
361-
.stop = socket_stop,
362-
#else
363332
.stop = NULL,
364-
#endif
365333
.down = socket_down,
366334
};
367335

@@ -734,8 +702,11 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
734702

735703
// So we handle closing a socket while another process is selecting
736704
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
737-
// Save process id as socket_stop may be called by enif_select.
738-
int32_t selecting_process_id = rsrc_obj->selecting_process_id;
705+
// Another process is selecting, therefore ref_count >= 3
706+
// 1. this caller's context heap (parameter to close)
707+
// 2. select
708+
// 3. monitor
709+
739710
// Stop selecting.
740711
int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
741712
if (UNLIKELY(stop_res < 0)) {
@@ -749,13 +720,19 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
749720
// When using asynchronous API, the selecting process can be the
750721
// calling process. In this case we don't send any notification.
751722
//
752-
if (selecting_process_id != ctx->process_id) {
723+
if (rsrc_obj->selecting_process_id != ctx->process_id) {
753724
// send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid
754-
if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) {
725+
if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) {
755726
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
756727
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
757728
}
758729
}
730+
731+
// Stop monitor
732+
enif_demonitor_process(erl_nif_env_from_context(ctx), rsrc_obj, &rsrc_obj->selecting_process_monitor);
733+
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
734+
735+
// Now, ref_count >= 1 only.
759736
}
760737

761738
// Eventually close the socket
@@ -1000,16 +977,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
1000977
RAISE_ERROR(BADARG_ATOM);
1001978
}
1002979

1003-
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
1004980
SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
1005981

1006982
ErlNifEnv *env = erl_nif_env_from_context(ctx);
1007983
if (rsrc_obj->selecting_process_id != ctx->process_id && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
1008984
// demonitor can fail if process is gone.
1009985
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
1010986
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
1011-
// decrement ref count as we are demonitoring
1012-
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
1013987
}
1014988
// Monitor first as select is less likely to fail and it's less expensive to demonitor
1015989
// if select fails than to stop select if monitor fails
@@ -1018,8 +992,6 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
1018992
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
1019993
RAISE_ERROR(NOPROC_ATOM);
1020994
}
1021-
// increment ref count so the resource doesn't go away until monitor is fired
1022-
refc_binary_increment_refcount(rsrc_refc);
1023995
rsrc_obj->selecting_process_id = ctx->process_id;
1024996
}
1025997

@@ -1042,7 +1014,6 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
10421014
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
10431015
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
10441016
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
1045-
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
10461017
RAISE_ERROR(BADARG_ATOM);
10471018
}
10481019
}
@@ -1087,9 +1058,9 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
10871058
break;
10881059
default:
10891060
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
1061+
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
10901062
LWIP_END();
10911063
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
1092-
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
10931064
RAISE_ERROR(BADARG_ATOM);
10941065
}
10951066
LWIP_END();
@@ -1115,11 +1086,10 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[])
11151086
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
11161087
enif_demonitor_process(erl_nif_env_from_context(ctx), rsrc_obj, &rsrc_obj->selecting_process_monitor);
11171088
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
1118-
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
1119-
refc_binary_decrement_refcount(rsrc_refc, ctx->global);
11201089
}
11211090
#if OTP_SOCKET_BSD
11221091
if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) {
1092+
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
11231093
RAISE_ERROR(BADARG_ATOM);
11241094
}
11251095
#elif OTP_SOCKET_LWIP
@@ -1759,8 +1729,10 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
17591729
int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen);
17601730
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
17611731
if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) {
1762-
AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd);
17631732
int err = errno;
1733+
if (err != EAGAIN) {
1734+
AVM_LOGI(TAG, "Unable to accept on socket %i. errno=%i", rsrc_obj->fd, (int) err);
1735+
}
17641736
term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global);
17651737
return make_error_tuple(reason, ctx);
17661738
} else {

src/libAtomVM/refc_binary.c

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ void refc_binary_destroy(struct RefcBinary *refc, struct GlobalContext *global)
7878
UNUSED(global);
7979

8080
if (refc->resource_type) {
81-
if (refc->resource_type->down) {
82-
// There may be monitors associated with this resource.
83-
destroy_resource_monitors(refc, global);
84-
}
8581
if (refc->resource_type->dtor) {
8682
ErlNifEnv env;
8783
erl_nif_env_partial_init_from_globalcontext(&env, global);

src/libAtomVM/resources.c

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ int enif_monitor_process(ErlNifEnv *env, void *obj, const ErlNifPid *target_pid,
378378

379379
struct ResourceMonitor *monitor = context_resource_monitor(target, obj);
380380
list_append(&resource->resource_type->monitors, &monitor->resource_list_head);
381+
refc_binary_increment_refcount(resource);
381382
globalcontext_get_process_unlock(env->global, target);
382383

383384
if (mon) {
@@ -405,6 +406,7 @@ int enif_demonitor_process(ErlNifEnv *env, void *obj, const ErlNifMonitor *mon)
405406
list_remove(&monitor->resource_list_head);
406407
list_remove(&monitor->base.monitor_list_head);
407408
free(monitor);
409+
refc_binary_decrement_refcount(resource, global);
408410
synclist_unlock(&global->processes_table);
409411
return 0;
410412
}
@@ -415,26 +417,6 @@ int enif_demonitor_process(ErlNifEnv *env, void *obj, const ErlNifMonitor *mon)
415417
return -1;
416418
}
417419

418-
void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global)
419-
{
420-
struct ListHead *processes_table_list = synclist_wrlock(&global->processes_table);
421-
UNUSED(processes_table_list);
422-
term monitor_obj = ((term) resource->data) | TERM_BOXED_VALUE_TAG;
423-
424-
struct ListHead *item;
425-
struct ListHead *tmp;
426-
MUTABLE_LIST_FOR_EACH (item, tmp, &resource->resource_type->monitors) {
427-
struct ResourceMonitor *monitor = GET_LIST_ENTRY(item, struct ResourceMonitor, resource_list_head);
428-
if (monitor->base.monitor_obj == monitor_obj) {
429-
list_remove(&monitor->resource_list_head);
430-
list_remove(&monitor->base.monitor_list_head);
431-
free(monitor);
432-
}
433-
}
434-
435-
synclist_unlock(&global->processes_table);
436-
}
437-
438420
int enif_compare_monitors(const ErlNifMonitor *monitor1, const ErlNifMonitor *monitor2)
439421
{
440422
uint64_t ref_ticks1 = *monitor1;

src/libAtomVM/resources.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,6 @@ bool select_event_notify(ErlNifEvent event, bool is_read, bool is_write, GlobalC
119119
*/
120120
void select_event_count_and_destroy_closed(struct ListHead *select_events, size_t *read, size_t *write, size_t *either, GlobalContext *global);
121121

122-
/**
123-
* @brief Destroy monitors associated with a resource.
124-
*
125-
* @param resource resource to destroy monitors for
126-
* @param global the global context
127-
*/
128-
void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global);
129-
130122
#define SELECT_EVENT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE)
131123

132124
/**

0 commit comments

Comments
 (0)