Skip to content

Commit a639cbc

Browse files
committed
Use signals for link/unlink/monitor/demonitor
Fix concurrency issues and memory leaks related to a poor usage of list of monitors that was accessed by other processes (fix #1384) Fix semantics of monitor where self cannot monitor itself and a monitor cannot be removed from another process. Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 7607279 commit a639cbc

File tree

11 files changed

+458
-150
lines changed

11 files changed

+458
-150
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ and a race condition in otp_socket code
5656
- Fixed a bug where calling repeatedly `process_info` on a stopped process could cause an out of
5757
memory error
5858
- Fixed possible concurrency problems in ESP32 UART driver
59+
- Fixed concurrency and memory leak related to links and monitors
5960

6061
### Changed
6162

src/libAtomVM/context.c

Lines changed: 118 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
#define DEFAULT_STACK_SIZE 8
5353
#define BYTES_PER_TERM (TERM_BITS / 8)
5454

55-
static struct ResourceMonitor *context_monitors_handle_terminate(Context *ctx);
55+
static struct Monitor *context_monitors_handle_terminate(Context *ctx);
5656
static void destroy_extended_registers(Context *ctx, unsigned int live);
5757

5858
Context *context_new(GlobalContext *glb)
@@ -133,27 +133,54 @@ void context_destroy(Context *ctx)
133133
// Ensure process is not registered
134134
globalcontext_maybe_unregister_process_id(ctx->global, ctx->process_id);
135135

136+
// Process any link/unlink/monitor/demonitor signal that arrived recently
137+
// Also process ProcessInfoRequestSignal so caller isn't trapped waiting
138+
MailboxMessage *signal_message = mailbox_process_outer_list(&ctx->mailbox);
139+
while (signal_message) {
140+
if (signal_message->type == ProcessInfoRequestSignal) {
141+
struct BuiltInAtomRequestSignal *request_signal
142+
= CONTAINER_OF(signal_message, struct BuiltInAtomRequestSignal, base);
143+
context_process_process_info_request_signal(ctx, request_signal);
144+
} else if (signal_message->type == MonitorSignal) {
145+
struct MonitorPointerSignal *monitor_signal
146+
= CONTAINER_OF(signal_message, struct MonitorPointerSignal, base);
147+
context_add_monitor(ctx, monitor_signal->monitor);
148+
} else if (signal_message->type == UnlinkSignal) {
149+
struct ImmediateSignal *immediate_signal
150+
= CONTAINER_OF(signal_message, struct ImmediateSignal, base);
151+
context_unlink(ctx, immediate_signal->immediate);
152+
} else if (signal_message->type == DemonitorSignal) {
153+
struct RefSignal *ref_signal
154+
= CONTAINER_OF(signal_message, struct RefSignal, base);
155+
context_demonitor(ctx, ref_signal->ref_ticks);
156+
}
157+
MailboxMessage *next = signal_message->next;
158+
mailbox_message_dispose(signal_message, &ctx->heap);
159+
signal_message = next;
160+
}
161+
136162
// When monitor message is sent, process is no longer in the table
137163
// and is no longer registered either.
138-
struct ResourceMonitor *resource_monitor = context_monitors_handle_terminate(ctx);
164+
struct Monitor *resource_monitors = context_monitors_handle_terminate(ctx);
139165

140166
synclist_unlock(&ctx->global->processes_table);
141167

142168
// Eventually call resource monitors handlers after the processes table was unlocked
143169
// The monitors were removed from the list of monitors.
144-
if (resource_monitor) {
170+
if (resource_monitors) {
145171
ErlNifEnv env;
146172
erl_nif_env_partial_init_from_globalcontext(&env, ctx->global);
147173

148174
struct ListHead monitors;
149-
list_prepend(&resource_monitor->base.monitor_list_head, &monitors);
175+
list_prepend(&resource_monitors->monitor_list_head, &monitors);
150176

151177
struct ListHead *item;
152178
struct ListHead *tmp;
153179
MUTABLE_LIST_FOR_EACH (item, tmp, &monitors) {
154180
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
155181
void *resource = term_to_term_ptr(monitor->monitor_obj);
156182
struct RefcBinary *refc = refc_binary_from_data(resource);
183+
resource_type_demonitor(refc->resource_type, monitor->ref_ticks);
157184
refc->resource_type->down(&env, resource, &ctx->process_id, &monitor->ref_ticks);
158185
free(monitor);
159186
}
@@ -199,18 +226,18 @@ void context_process_process_info_request_signal(Context *ctx, struct BuiltInAto
199226
if (context_get_process_info(ctx, NULL, &term_size, signal->atom, NULL)) {
200227
Heap heap;
201228
if (UNLIKELY(memory_init_heap(&heap, term_size) != MEMORY_GC_OK)) {
202-
mailbox_send_built_in_atom_signal(target, TrapExceptionSignal, OUT_OF_MEMORY_ATOM);
229+
mailbox_send_immediate_signal(target, TrapExceptionSignal, OUT_OF_MEMORY_ATOM);
203230
} else {
204231
term ret;
205232
if (context_get_process_info(ctx, &ret, NULL, signal->atom, &heap)) {
206233
mailbox_send_term_signal(target, TrapAnswerSignal, ret);
207234
} else {
208-
mailbox_send_built_in_atom_signal(target, TrapExceptionSignal, ret);
235+
mailbox_send_immediate_signal(target, TrapExceptionSignal, ret);
209236
}
210237
memory_destroy_heap(&heap, ctx->global);
211238
}
212239
} else {
213-
mailbox_send_built_in_atom_signal(target, TrapExceptionSignal, BADARG_ATOM);
240+
mailbox_send_immediate_signal(target, TrapExceptionSignal, BADARG_ATOM);
214241
}
215242
globalcontext_get_process_unlock(ctx->global, target);
216243
} // else: sender died
@@ -375,60 +402,53 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
375402
return true;
376403
}
377404

378-
static struct ResourceMonitor *context_monitors_handle_terminate(Context *ctx)
405+
static struct Monitor *context_monitors_handle_terminate(Context *ctx)
379406
{
380407
GlobalContext *glb = ctx->global;
381408
struct ListHead *item;
382409
struct ListHead *tmp;
383-
struct ResourceMonitor *result = NULL;
410+
struct Monitor *result = NULL;
384411
MUTABLE_LIST_FOR_EACH (item, tmp, &ctx->monitors_head) {
385412
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
386-
if (monitor->ref_ticks && term_is_boxed(monitor->monitor_obj)) {
387-
// Resource monitor
388-
struct ResourceMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceMonitor, base);
389-
// remove the monitor from the list of the resource
390-
list_remove(&resource_monitor->resource_list_head);
391-
list_init(&resource_monitor->resource_list_head);
413+
if (monitor->ref_ticks && ((monitor->monitor_obj & 0x3) == CONTEXT_MONITOR_RESOURCE_TAG)) {
392414
// remove it from the list we are iterating on
393415
if (result == NULL) {
394-
list_init(&resource_monitor->base.monitor_list_head);
395-
result = resource_monitor;
416+
list_init(&monitor->monitor_list_head);
417+
result = monitor;
396418
} else {
397-
list_append(&result->base.monitor_list_head, &resource_monitor->base.monitor_list_head);
419+
list_append(&result->monitor_list_head, &monitor->monitor_list_head);
398420
}
399-
} else {
421+
} else if (monitor->ref_ticks == 0 || ((monitor->monitor_obj & 0x3) == CONTEXT_MONITOR_MONITORED_PID_TAG)) {
422+
// term_to_local_process_id and monitor->monitor_obj >> 4 are identical here.
400423
int local_process_id = term_to_local_process_id(monitor->monitor_obj);
401424
Context *target = globalcontext_get_process_nolock(glb, local_process_id);
402425
if (IS_NULL_PTR(target)) {
403-
// TODO: we should scan for existing monitors when a context is destroyed
404-
// otherwise memory might be wasted for long living processes
426+
// TODO: assess whether this can happen
405427
free(monitor);
406428
continue;
407429
}
408430

409-
if (monitor->ref_ticks == 0 && (ctx->exit_reason != NORMAL_ATOM || target->trap_exit)) {
431+
if (monitor->ref_ticks == 0) {
432+
term exited_pid = term_from_local_process_id(ctx->process_id);
433+
mailbox_send_immediate_signal(target, UnlinkSignal, exited_pid);
410434
if (target->trap_exit) {
411435
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) {
412436
// TODO: handle out of memory here
413437
fprintf(stderr, "Cannot handle out of memory.\n");
414438
globalcontext_get_process_unlock(glb, target);
415439
AVM_ABORT();
416440
}
417-
418-
term exited_pid = term_from_local_process_id(ctx->process_id);
419-
// Process table should be locked before context_unlink is
420-
// called. This is done in calling function context_destroy.
421-
context_unlink(target, exited_pid);
422441
// Prepare the message on ctx's heap which will be freed afterwards.
423442
term info_tuple = term_alloc_tuple(3, &ctx->heap);
424443
term_put_tuple_element(info_tuple, 0, EXIT_ATOM);
425444
term_put_tuple_element(info_tuple, 1, exited_pid);
426445
term_put_tuple_element(info_tuple, 2, ctx->exit_reason);
427446
mailbox_send(target, info_tuple);
428-
} else {
447+
} else if (ctx->exit_reason != NORMAL_ATOM) {
429448
mailbox_send_term_signal(target, KillSignal, ctx->exit_reason);
430449
}
431-
} else if (monitor->ref_ticks) {
450+
} else {
451+
mailbox_send_ref_signal(target, DemonitorSignal, monitor->ref_ticks);
432452
int required_terms = REF_SIZE + TUPLE_SIZE(5);
433453
if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) {
434454
// TODO: handle out of memory here
@@ -454,63 +474,71 @@ static struct ResourceMonitor *context_monitors_handle_terminate(Context *ctx)
454474
mailbox_send(target, info_tuple);
455475
}
456476
free(monitor);
477+
} else {
478+
// We are the monitoring process.
479+
int local_process_id = monitor->monitor_obj >> 4;
480+
Context *target = globalcontext_get_process_nolock(glb, local_process_id);
481+
mailbox_send_ref_signal(target, DemonitorSignal, monitor->ref_ticks);
482+
free(monitor);
457483
}
458484
}
459485
return result;
460486
}
461487

462-
int context_link(Context *ctx, term link_pid)
488+
struct Monitor *monitor_link_new(term link_pid)
463489
{
464-
struct ListHead *item;
465-
struct Monitor *monitor;
466-
LIST_FOR_EACH (item, &ctx->monitors_head) {
467-
monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
468-
if ((monitor->monitor_obj == link_pid) && (monitor->ref_ticks == 0)) {
469-
return 0;
470-
}
471-
}
472-
monitor = malloc(sizeof(struct Monitor));
490+
struct Monitor *monitor = malloc(sizeof(struct Monitor));
473491
if (IS_NULL_PTR(monitor)) {
474-
return -1;
492+
return NULL;
475493
}
476494
monitor->monitor_obj = link_pid;
477495
monitor->ref_ticks = 0;
478-
list_append(&ctx->monitors_head, &monitor->monitor_list_head);
479496

480-
return 0;
497+
return monitor;
481498
}
482499

483-
uint64_t context_monitor(Context *ctx, term monitor_pid)
500+
struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, bool is_monitoring)
484501
{
485-
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);
486-
487502
struct Monitor *monitor = malloc(sizeof(struct Monitor));
488503
if (IS_NULL_PTR(monitor)) {
489-
return 0;
504+
return NULL;
505+
}
506+
int32_t local_process_id = term_to_local_process_id(monitor_pid);
507+
if (is_monitoring) {
508+
monitor->monitor_obj = (local_process_id << 4) | CONTEXT_MONITOR_MONITORING_PID_TAG;
509+
} else {
510+
monitor->monitor_obj = (local_process_id << 4) | CONTEXT_MONITOR_MONITORED_PID_TAG;
490511
}
491-
monitor->monitor_obj = monitor_pid;
492512
monitor->ref_ticks = ref_ticks;
493-
list_append(&ctx->monitors_head, &monitor->monitor_list_head);
494513

495-
return ref_ticks;
514+
return monitor;
496515
}
497516

498-
struct ResourceMonitor *context_resource_monitor(Context *ctx, void *resource)
517+
struct Monitor *monitor_resource_monitor_new(void *resource, uint64_t ref_ticks)
499518
{
500-
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);
501-
502-
struct ResourceMonitor *monitor = malloc(sizeof(struct ResourceMonitor));
519+
struct Monitor *monitor = malloc(sizeof(struct Monitor));
503520
if (IS_NULL_PTR(monitor)) {
504521
return NULL;
505522
}
506-
// Not really boxed, but sufficient to distinguish from pids
507-
monitor->base.monitor_obj = ((term) resource) | TERM_BOXED_VALUE_TAG;
508-
monitor->base.ref_ticks = ref_ticks;
509-
list_append(&ctx->monitors_head, &monitor->base.monitor_list_head);
523+
monitor->monitor_obj = ((term) resource) | CONTEXT_MONITOR_RESOURCE_TAG;
524+
monitor->ref_ticks = ref_ticks;
510525

511526
return monitor;
512527
}
513528

529+
void context_add_monitor(Context *ctx, struct Monitor *new_monitor)
530+
{
531+
struct ListHead *item;
532+
LIST_FOR_EACH (item, &ctx->monitors_head) {
533+
struct Monitor *existing = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
534+
if ((existing->monitor_obj == new_monitor->monitor_obj) && (existing->ref_ticks == new_monitor->ref_ticks)) {
535+
free(new_monitor);
536+
return;
537+
}
538+
}
539+
list_append(&ctx->monitors_head, &new_monitor->monitor_list_head);
540+
}
541+
514542
void context_unlink(Context *ctx, term link_pid)
515543
{
516544
struct ListHead *item;
@@ -523,3 +551,36 @@ void context_unlink(Context *ctx, term link_pid)
523551
}
524552
}
525553
}
554+
555+
void context_demonitor(Context *ctx, uint64_t ref_ticks)
556+
{
557+
struct ListHead *item;
558+
LIST_FOR_EACH (item, &ctx->monitors_head) {
559+
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
560+
if (monitor->ref_ticks == ref_ticks) {
561+
list_remove(&monitor->monitor_list_head);
562+
free(monitor);
563+
return;
564+
}
565+
}
566+
}
567+
568+
term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitoring)
569+
{
570+
struct ListHead *item;
571+
LIST_FOR_EACH (item, &ctx->monitors_head) {
572+
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
573+
if (monitor->ref_ticks == ref_ticks) {
574+
if ((monitor->monitor_obj & 0x3) == CONTEXT_MONITOR_MONITORED_PID_TAG) {
575+
*is_monitoring = false;
576+
return term_from_local_process_id(monitor->monitor_obj >> 4);
577+
} else if ((monitor->monitor_obj & 0x3) == CONTEXT_MONITOR_MONITORING_PID_TAG) {
578+
*is_monitoring = true;
579+
return term_from_local_process_id(monitor->monitor_obj >> 4);
580+
} else {
581+
return term_invalid_term();
582+
}
583+
}
584+
}
585+
return term_invalid_term();
586+
}

0 commit comments

Comments
 (0)