Skip to content

Commit d4a27d7

Browse files
committed
Add support for distributed links
Also fix external_from_binary to avoid GC-caused corruptions and simplify API Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent f660ded commit d4a27d7

File tree

13 files changed

+821
-249
lines changed

13 files changed

+821
-249
lines changed

src/libAtomVM/context.c

Lines changed: 138 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
#define DEFAULT_STACK_SIZE 8
5353
#define BYTES_PER_TERM (TERM_BITS / 8)
5454

55-
static struct ResourceContextMonitor *context_monitors_handle_terminate(Context *ctx);
55+
static struct Monitor *context_monitors_handle_terminate(Context *ctx);
5656
static void context_distribution_handle_terminate(Context *ctx);
5757
static void destroy_extended_registers(Context *ctx, unsigned int live);
5858

@@ -174,6 +174,22 @@ void context_destroy(Context *ctx)
174174
context_unlink_ack(ctx, immediate_ref_signal->immediate, immediate_ref_signal->ref_ticks);
175175
break;
176176
}
177+
case UnlinkRemoteIDSignal: {
178+
struct TermSignal *term_signal
179+
= CONTAINER_OF(signal_message, struct TermSignal, base);
180+
uint64_t unlink_id = term_maybe_unbox_int64(term_get_tuple_element(term_signal->signal_term, 0));
181+
term remote_pid = term_get_tuple_element(term_signal->signal_term, 1);
182+
context_ack_unlink(ctx, remote_pid, unlink_id, true);
183+
break;
184+
}
185+
case UnlinkRemoteIDAckSignal: {
186+
struct TermSignal *term_signal
187+
= CONTAINER_OF(signal_message, struct TermSignal, base);
188+
uint64_t unlink_id = term_maybe_unbox_int64(term_get_tuple_element(term_signal->signal_term, 0));
189+
term remote_pid = term_get_tuple_element(term_signal->signal_term, 1);
190+
context_unlink_ack(ctx, remote_pid, unlink_id);
191+
break;
192+
}
177193
case DemonitorSignal: {
178194
struct RefSignal *ref_signal
179195
= CONTAINER_OF(signal_message, struct RefSignal, base);
@@ -200,24 +216,44 @@ void context_destroy(Context *ctx)
200216

201217
// When monitor message is sent, process is no longer in the table
202218
// and is no longer registered either.
203-
struct ResourceContextMonitor *resource_monitors = context_monitors_handle_terminate(ctx);
219+
struct Monitor *remaining_monitors = context_monitors_handle_terminate(ctx);
204220

205221
synclist_unlock(&ctx->global->processes_table);
206222

207-
// Eventually call resource monitors handlers after the processes table was unlocked
223+
// Eventually call distribution and resource monitors handlers after the processes table was unlocked
208224
// The monitors were removed from the list of monitors.
209-
if (resource_monitors) {
225+
if (remaining_monitors) {
210226
struct ListHead monitors;
211-
list_prepend(&resource_monitors->monitor.monitor_list_head, &monitors);
227+
list_prepend(&remaining_monitors->monitor_list_head, &monitors);
212228

213229
struct ListHead *item;
214230
struct ListHead *tmp;
215231
MUTABLE_LIST_FOR_EACH (item, tmp, &monitors) {
216232
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
217-
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
218-
struct RefcBinary *refc = refc_binary_from_data(resource_monitor->resource_obj);
219-
resource_type_fire_monitor(refc->resource_type, erl_nif_env_from_context(ctx), resource_monitor->resource_obj, ctx->process_id, resource_monitor->ref_ticks);
220-
free(monitor);
233+
switch (monitor->monitor_type) {
234+
case CONTEXT_MONITOR_RESOURCE: {
235+
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
236+
struct RefcBinary *refc = refc_binary_from_data(resource_monitor->resource_obj);
237+
resource_type_fire_monitor(refc->resource_type, erl_nif_env_from_context(ctx), resource_monitor->resource_obj, ctx->process_id, resource_monitor->ref_ticks);
238+
free(monitor);
239+
break;
240+
}
241+
case CONTEXT_MONITOR_LINK_REMOTE: {
242+
struct LinkRemoteMonitor *link_monitor = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
243+
// Handle the case of inactive link.
244+
if (link_monitor->unlink_id != UNLINK_ID_LINK_ACTIVE) {
245+
free(monitor);
246+
continue;
247+
}
248+
dist_send_payload_exit(link_monitor, ctx->exit_reason, ctx);
249+
free(monitor);
250+
break;
251+
}
252+
case CONTEXT_MONITOR_LINK_LOCAL:
253+
case CONTEXT_MONITOR_MONITORED_LOCAL:
254+
case CONTEXT_MONITOR_MONITORING_LOCAL:
255+
UNREACHABLE();
256+
}
221257
}
222258
}
223259

@@ -439,17 +475,21 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
439475
break;
440476
case LINKS_ATOM: {
441477
struct ListHead *item;
442-
size_t links_count = 0;
478+
ret_size = TUPLE_SIZE(2);
443479
LIST_FOR_EACH (item, &ctx->monitors_head) {
444480
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
445481
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
446482
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
447483
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
448-
links_count++;
484+
ret_size += CONS_SIZE;
485+
}
486+
} else if (monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
487+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
488+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
489+
ret_size += CONS_SIZE + EXTERNAL_PID_SIZE;
449490
}
450491
}
451492
}
452-
ret_size = TUPLE_SIZE(2) + CONS_SIZE * links_count;
453493
break;
454494
}
455495
default:
@@ -531,6 +571,12 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
531571
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
532572
list = term_list_prepend(link->link_local_process_id, list, heap);
533573
}
574+
} else if (monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
575+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
576+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
577+
term external_pid = term_make_external_process_id(link->node, link->pid_number, link->pid_serial, link->creation, heap);
578+
list = term_list_prepend(external_pid, list, heap);
579+
}
534580
}
535581
}
536582
term_put_tuple_element(ret, 1, list);
@@ -544,25 +590,24 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
544590
return true;
545591
}
546592

547-
static struct ResourceContextMonitor *context_monitors_handle_terminate(Context *ctx)
593+
static struct Monitor *context_monitors_handle_terminate(Context *ctx)
548594
{
549595
GlobalContext *glb = ctx->global;
550596
struct ListHead *item;
551597
struct ListHead *tmp;
552-
struct ResourceContextMonitor *result = NULL;
598+
struct Monitor *result = NULL;
553599
MUTABLE_LIST_FOR_EACH (item, tmp, &ctx->monitors_head) {
554600
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
555601
switch (monitor->monitor_type) {
556602
case CONTEXT_MONITOR_RESOURCE: {
557603
// monitor with resource
558604
// remove it from the list we are iterating on and
559605
// add it to the list of resource monitors to handle afterwards
560-
struct ResourceContextMonitor *resource_monitor = CONTAINER_OF(monitor, struct ResourceContextMonitor, monitor);
561606
if (result == NULL) {
562607
list_init(&monitor->monitor_list_head);
563-
result = resource_monitor;
608+
result = monitor;
564609
} else {
565-
list_append(&result->monitor.monitor_list_head, &monitor->monitor_list_head);
610+
list_append(&result->monitor_list_head, &monitor->monitor_list_head);
566611
}
567612
break;
568613
}
@@ -605,6 +650,16 @@ static struct ResourceContextMonitor *context_monitors_handle_terminate(Context
605650
free(monitor);
606651
break;
607652
}
653+
case CONTEXT_MONITOR_LINK_REMOTE: {
654+
// Process it afterwards
655+
if (result == NULL) {
656+
list_init(&monitor->monitor_list_head);
657+
result = monitor;
658+
} else {
659+
list_append(&result->monitor_list_head, &monitor->monitor_list_head);
660+
}
661+
break;
662+
}
608663
case CONTEXT_MONITOR_MONITORED_LOCAL: {
609664
struct MonitorLocalMonitor *monitored_monitor = CONTAINER_OF(monitor, struct MonitorLocalMonitor, monitor);
610665
int32_t local_process_id = term_to_local_process_id(monitored_monitor->monitor_obj);
@@ -651,15 +706,28 @@ static void context_distribution_handle_terminate(Context *ctx)
651706

652707
struct Monitor *monitor_link_new(term link_pid)
653708
{
654-
struct LinkLocalMonitor *monitor = malloc(sizeof(struct LinkLocalMonitor));
655-
if (IS_NULL_PTR(monitor)) {
656-
return NULL;
709+
if (term_is_local_pid_or_port(link_pid)) {
710+
struct LinkLocalMonitor *monitor = malloc(sizeof(struct LinkLocalMonitor));
711+
if (IS_NULL_PTR(monitor)) {
712+
return NULL;
713+
}
714+
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_LOCAL;
715+
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
716+
monitor->link_local_process_id = link_pid;
717+
return &monitor->monitor;
718+
} else {
719+
struct LinkRemoteMonitor *monitor = malloc(sizeof(struct LinkRemoteMonitor));
720+
if (IS_NULL_PTR(monitor)) {
721+
return NULL;
722+
}
723+
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_REMOTE;
724+
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
725+
monitor->node = term_get_external_node(link_pid);
726+
monitor->pid_number = term_get_external_pid_process_id(link_pid);
727+
monitor->pid_serial = term_get_external_pid_serial(link_pid);
728+
monitor->creation = term_get_external_node_creation(link_pid);
729+
return &monitor->monitor;
657730
}
658-
monitor->monitor.monitor_type = CONTEXT_MONITOR_LINK_LOCAL;
659-
monitor->unlink_id = UNLINK_ID_LINK_ACTIVE;
660-
monitor->link_local_process_id = link_pid;
661-
662-
return &monitor->monitor;
663731
}
664732

665733
struct Monitor *monitor_new(term monitor_pid, uint64_t ref_ticks, bool is_monitoring)
@@ -727,6 +795,18 @@ bool context_add_monitor(Context *ctx, struct Monitor *new_monitor)
727795
}
728796
break;
729797
}
798+
case CONTEXT_MONITOR_LINK_REMOTE: {
799+
struct LinkRemoteMonitor *new_link_monitor = CONTAINER_OF(new_monitor, struct LinkRemoteMonitor, monitor);
800+
struct LinkRemoteMonitor *existing_link_monitor = CONTAINER_OF(existing, struct LinkRemoteMonitor, monitor);
801+
if (UNLIKELY(existing_link_monitor->node == new_link_monitor->node
802+
&& existing_link_monitor->pid_number == new_link_monitor->pid_number
803+
&& existing_link_monitor->pid_serial == new_link_monitor->pid_serial
804+
&& existing_link_monitor->creation == new_link_monitor->creation)) {
805+
free(new_monitor);
806+
return false;
807+
}
808+
break;
809+
}
730810
}
731811
}
732812
}
@@ -739,7 +819,7 @@ bool context_set_unlink_id(Context *ctx, term link_pid, uint64_t *unlink_id)
739819
struct ListHead *item;
740820
LIST_FOR_EACH (item, &ctx->monitors_head) {
741821
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
742-
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
822+
if (term_is_local_pid_or_port(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
743823
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
744824
if (link->link_local_process_id == link_pid) {
745825
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
@@ -751,6 +831,21 @@ bool context_set_unlink_id(Context *ctx, term link_pid, uint64_t *unlink_id)
751831
return false;
752832
}
753833
}
834+
} else if (term_is_external_pid(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
835+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
836+
if (link->node == term_get_external_node(link_pid)
837+
&& link->pid_number == term_get_external_pid_process_id(link_pid)
838+
&& link->pid_serial == term_get_external_pid_serial(link_pid)
839+
&& link->creation == term_get_external_node_creation(link_pid)) {
840+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
841+
uint64_t new_id = globalcontext_get_ref_ticks(ctx->global);
842+
link->unlink_id = new_id;
843+
*unlink_id = new_id;
844+
return true;
845+
} else {
846+
return false;
847+
}
848+
}
754849
}
755850
}
756851
return false;
@@ -761,7 +856,7 @@ void context_ack_unlink(Context *ctx, term link_pid, uint64_t unlink_id, bool pr
761856
struct ListHead *item;
762857
LIST_FOR_EACH (item, &ctx->monitors_head) {
763858
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
764-
if (monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
859+
if (term_is_local_pid_or_port(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_LOCAL) {
765860
struct LinkLocalMonitor *link = CONTAINER_OF(monitor, struct LinkLocalMonitor, monitor);
766861
if (link->link_local_process_id == link_pid) {
767862
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
@@ -785,6 +880,20 @@ void context_ack_unlink(Context *ctx, term link_pid, uint64_t unlink_id, bool pr
785880
}
786881
return;
787882
}
883+
} else if (term_is_external_pid(link_pid) && monitor->monitor_type == CONTEXT_MONITOR_LINK_REMOTE) {
884+
struct LinkRemoteMonitor *link = CONTAINER_OF(monitor, struct LinkRemoteMonitor, monitor);
885+
if (link->node == term_get_external_node(link_pid)
886+
&& link->pid_number == term_get_external_pid_process_id(link_pid)
887+
&& link->pid_serial == term_get_external_pid_serial(link_pid)
888+
&& link->creation == term_get_external_node_creation(link_pid)) {
889+
if (link->unlink_id == UNLINK_ID_LINK_ACTIVE) {
890+
// Send ack and remove link
891+
dist_send_unlink_id_ack(unlink_id, term_from_local_process_id(ctx->process_id), link_pid, ctx);
892+
list_remove(&monitor->monitor_list_head);
893+
free(monitor);
894+
}
895+
return;
896+
}
788897
}
789898
}
790899
}
@@ -833,6 +942,7 @@ void context_demonitor(Context *ctx, uint64_t ref_ticks)
833942
}
834943
}
835944
case CONTEXT_MONITOR_LINK_LOCAL:
945+
case CONTEXT_MONITOR_LINK_REMOTE:
836946
break;
837947
}
838948
}
@@ -854,6 +964,7 @@ term context_get_monitor_pid(Context *ctx, uint64_t ref_ticks, bool *is_monitori
854964
break;
855965
}
856966
case CONTEXT_MONITOR_LINK_LOCAL:
967+
case CONTEXT_MONITOR_LINK_REMOTE:
857968
case CONTEXT_MONITOR_RESOURCE:
858969
break;
859970
}

src/libAtomVM/context.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ enum ContextMonitorType
156156
CONTEXT_MONITOR_MONITORING_LOCAL,
157157
CONTEXT_MONITOR_MONITORED_LOCAL,
158158
CONTEXT_MONITOR_RESOURCE,
159+
CONTEXT_MONITOR_LINK_REMOTE,
159160
};
160161

161162
#define UNLINK_ID_LINK_ACTIVE 0x0
@@ -191,6 +192,16 @@ struct ResourceContextMonitor
191192
void *resource_obj;
192193
};
193194

195+
struct LinkRemoteMonitor
196+
{
197+
struct Monitor monitor;
198+
uint64_t unlink_id;
199+
term node;
200+
uint32_t pid_number;
201+
uint32_t pid_serial;
202+
uint32_t creation;
203+
};
204+
194205
struct ExtendedRegister
195206
{
196207
struct ListHead head;
@@ -456,10 +467,10 @@ bool context_get_process_info(Context *ctx, term *out, size_t *term_size, term a
456467
/**
457468
* @brief Half-link process to another process
458469
*
459-
* @param monitor_pid process to link to
470+
* @param link_pid process to link to (local or remote)
460471
* @return the allocated monitor or NULL if allocation failed
461472
*/
462-
struct Monitor *monitor_link_new(term monitor_pid);
473+
struct Monitor *monitor_link_new(term link_pid);
463474

464475
/**
465476
* @brief Create a monitor on a process.

0 commit comments

Comments
 (0)