diff --git a/modules/ggipcd/src/ipc_server.c b/modules/ggipcd/src/ipc_server.c index eca4be9f1..cfaadcb64 100644 --- a/modules/ggipcd/src/ipc_server.c +++ b/modules/ggipcd/src/ipc_server.c @@ -410,6 +410,16 @@ static GglError handle_operation( return GGL_ERR_INVALID; } + if ((common_headers.message_flags & EVENTSTREAM_TERMINATE_STREAM) != 0) { + GGL_LOGD( + "Termination requested of stream %d for %d.", + common_headers.stream_id, + handle + ); + ggl_ipc_terminate_stream(handle, common_headers.stream_id); + return GGL_ERR_OK; + } + GGL_LOGD( "Handling operation on stream %d for %d.", common_headers.stream_id, diff --git a/modules/ggipcd/src/ipc_subscriptions.c b/modules/ggipcd/src/ipc_subscriptions.c index ae854a21e..d77915ca0 100644 --- a/modules/ggipcd/src/ipc_subscriptions.c +++ b/modules/ggipcd/src/ipc_subscriptions.c @@ -184,3 +184,23 @@ GglError ggl_ipc_release_subscriptions_for_conn(uint32_t resp_handle) { return GGL_ERR_OK; } + +void ggl_ipc_terminate_stream(uint32_t resp_handle, int32_t stream_id) { + for (size_t i = 0; i < GGL_COREBUS_CLIENT_MAX_SUBSCRIPTIONS; i++) { + uint32_t recv_handle = 0; + + { + GGL_MTX_SCOPE_GUARD(&subs_state_mtx); + + if ((subs_resp_handle[i] == resp_handle) + && (subs_stream_id[i] == stream_id)) { + recv_handle = subs_recv_handle[i]; + } + } + + if (recv_handle != 0) { + ggl_client_sub_close(recv_handle); + return; + } + } +} diff --git a/modules/ggipcd/src/ipc_subscriptions.h b/modules/ggipcd/src/ipc_subscriptions.h index 0f8e39f00..74f254c82 100644 --- a/modules/ggipcd/src/ipc_subscriptions.h +++ b/modules/ggipcd/src/ipc_subscriptions.h @@ -30,4 +30,7 @@ GglError ggl_ipc_bind_subscription( /// Clean up subscriptions for an IPC client GglError ggl_ipc_release_subscriptions_for_conn(uint32_t resp_handle); +/// Cleans up subscription associated with an IPC client's stream +void ggl_ipc_terminate_stream(uint32_t resp_handle, int32_t stream_id); + #endif