From 8d627a072d8d8264f6f306c03b813e5564210c09 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Wed, 9 Jul 2025 12:55:46 +0200 Subject: [PATCH 1/6] feat(mosq): Update brokerless example to work with esp-peer --- ...default-event-to-http-client-handler.patch | 24 ++ .../examples/serverless_mqtt/CMakeLists.txt | 12 + .../Remove-deprecated-freeRTOS-header.patch | 24 ++ .../serverless_mqtt/esp_peer_setup.sh | 27 ++ .../serverless_mqtt/main/CMakeLists.txt | 3 +- .../examples/serverless_mqtt/main/webrtc.c | 311 ++++++++++++++++++ .../serverless_mqtt/sdkconfig.defaults | 3 + 7 files changed, 403 insertions(+), 1 deletion(-) create mode 100644 components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch create mode 100644 components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch create mode 100755 components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh create mode 100644 components/mosquitto/examples/serverless_mqtt/main/webrtc.c diff --git a/components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch b/components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch new file mode 100644 index 0000000000..b84e34f1c2 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch @@ -0,0 +1,24 @@ +From a2d51a026449cf1ba2cb53996ce1cdf1f6f078d2 Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Wed, 9 Jul 2025 12:40:36 +0200 +Subject: [PATCH] fix(htt): Add default event to http-client handler + +--- + components/esp_webrtc/impl/apprtc_signal/https_client.c | 2 ++ + 1 file changed, 2 insertions(+) + +diff --git a/components/esp_webrtc/impl/apprtc_signal/https_client.c b/components/esp_webrtc/impl/apprtc_signal/https_client.c +index b7ee6fc..aca4a4e 100644 +--- a/components/esp_webrtc/impl/apprtc_signal/https_client.c ++++ b/components/esp_webrtc/impl/apprtc_signal/https_client.c +@@ -48,6 +48,8 @@ esp_err_t _http_event_handler(esp_http_client_event_t *evt) + { + http_info_t *info = evt->user_data; + switch (evt->event_id) { ++ default: ++ break; + case HTTP_EVENT_ERROR: + ESP_LOGD(TAG, "HTTP_EVENT_ERROR"); + break; +-- +2.25.1 diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt index c9935c9567..afd58e7104 100644 --- a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -3,4 +3,16 @@ cmake_minimum_required(VERSION 3.16) include($ENV{IDF_PATH}/tools/cmake/project.cmake) + +execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup.sh + ${CMAKE_BINARY_DIR} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + RESULT_VARIABLE script_result) + +if(script_result) + message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}") +endif() + +list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/") + project(serverless_mqtt) diff --git a/components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch b/components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch new file mode 100644 index 0000000000..ab5cd3d6bb --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch @@ -0,0 +1,24 @@ +From 06b15a30e860daef303b6c6541a2cae858c17e6c Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Thu, 10 Jul 2025 11:09:57 +0200 +Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header + +--- + components/media_lib_sal/port/media_lib_os_freertos.c | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c +index d248d59..7b588ca 100644 +--- a/components/media_lib_sal/port/media_lib_os_freertos.c ++++ b/components/media_lib_sal/port/media_lib_os_freertos.c +@@ -40,7 +40,7 @@ + #include "esp_idf_version.h" + + #if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT +-#include "freertos/task_snapshot.h" ++#include "esp_private/freertos_debug.h" + #endif + + #ifdef __XTENSA__ +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh new file mode 100755 index 0000000000..f1f56ce4e7 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +set -e +echo "bin_dir: $1" + +bin_dir="$1" +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +ESP_PEER_VERSION="9512eef258a45aafcaaa309b1a67505c8b500363" + +ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip" +ESP_PEER_DIR="${bin_dir}/esp-peer" +ZIP_PATH="${bin_dir}/esp-peer.zip" +EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}" +COMPONENTS_SRC="${EXTRACTED_DIR}/components" +COMPONENTS_DST="${ESP_PEER_DIR}/components" +PATCH_FILE_1="${THIS_DIR}/Add-default-event-to-http-client-handler.patch" +PATCH_FILE_2="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch" + +# Download if not exists +if [ ! -d "$EXTRACTED_DIR" ]; then + echo "Downloading esp-peer ${ESP_PEER_VERSION}..." + wget -O "$ZIP_PATH" "$ESP_PEER_URL" + unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2" + mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR} + mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components +fi diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index b757b72863..2b04811113 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -1,4 +1,5 @@ idf_component_register(SRCS "serverless_mqtt.c" "wifi_connect.c" + "webrtc.c" INCLUDE_DIRS "." - REQUIRES libjuice nvs_flash mqtt json esp_wifi) + REQUIRES libjuice nvs_flash mqtt json esp_wifi media_lib_sal esp_webrtc peer_default) diff --git a/components/mosquitto/examples/serverless_mqtt/main/webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/webrtc.c new file mode 100644 index 0000000000..c506e8cf24 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/webrtc.c @@ -0,0 +1,311 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +/* OpenAI realtime communication Demo code + + This example code is in the Public Domain (or CC0 licensed, at your option.) + + Unless required by applicable law or agreed to in writing, this + software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. +*/ + +#include "media_lib_os.h" +#include "esp_log.h" +#include "esp_webrtc_defaults.h" +#include "esp_peer_default.h" +#include "common.h" +#include "esp_timer.h" +#include "esp_random.h" + + +bool network_is_connected(void); +/** + * @brief Start WebRTC + * + * @param[in] url Signaling url + * + * @return + * - 0 On success + * - Others Fail to start + */ +int start_webrtc(char *url); + +/** + * @brief Query WebRTC Status + */ +void query_webrtc(void); + +/** + * @brief Stop WebRTC + * + * @return + * - 0 On success + * - Others Fail to stop + */ +int stop_webrtc(void); + +#define TAG "PEER_DEMO" + +#define TEST_PERIOD 1000 + +static esp_peer_signaling_handle_t signaling = NULL; +static esp_peer_handle_t peer = NULL; +static esp_timer_handle_t timer; +static int send_sequence = 0; +static bool peer_running = false; + +typedef struct { + const char* question; + const char* answer; +} data_channel_chat_content_t; + +static data_channel_chat_content_t chat_content[] = { + {"Hi!", "Hello!"}, + {"How are you?", "I am fine."}, + {"Wish to be your friend.", "Great!"}, + {"What's your name?", "I am a chatbot, nice to meet you!"}, + {"What do you do?", "I am here to chat and assist you with various tasks."}, + {"How old are you?", "I don't have an age. I was created to chat with you!"}, + {"Do you have hobbies?", "I enjoy chatting with you and learning new things."}, + {"Tell me a story.", "Once upon a time, a curious cat discovered a magical world..."}, + {"Tell me a joke.", "Why don't skeletons fight each other? They don't have the guts!"}, + {"What is the weather like?", "I am not sure, but you can check your local forecast."}, + {"What is your favorite color?", "I don't have a favorite color, but I like all of them!"}, + {"What is the time?", "Sorry, I can't tell the time. You can check your device for that."}, + {"What can you do?", "I can answer questions, tell jokes, help with tasks, and much more!"}, + {"Where are you from?", "I was created by developers, so I don't have a specific location."}, + {"What is love?", "Love is a complex emotion that connects people. What do you think love is?"}, + {"Do you like music?", "I don't listen to music, but I know about it! What's your favorite genre?"}, + {"Goodbye!", "Bye!"}, +}; + +static void send_cb(void *ctx) +{ + if (peer) { + uint8_t data[64]; + memset(data, (uint8_t)send_sequence, sizeof(data)); + esp_peer_audio_frame_t audio_frame = { + .data = data, + .size = sizeof(data), + .pts = send_sequence, + }; + // Send audio data + esp_peer_send_audio(peer, &audio_frame); + send_sequence++; + int question = esp_random() % (sizeof(chat_content) / sizeof(chat_content[0])); + + // Send question through data channel + esp_peer_data_frame_t data_frame = { + .type = ESP_PEER_DATA_CHANNEL_DATA, + .data = (uint8_t*)chat_content[question].question, + .size = strlen(chat_content[question].question) + 1, + }; + ESP_LOGI(TAG, "Send question:%s", (char*)data_frame.data); + esp_peer_send_data(peer, &data_frame); + } +} + +static int peer_state_handler(esp_peer_state_t state, void* ctx) +{ + if (state == ESP_PEER_STATE_CONNECTED) { + if (timer == NULL) { + esp_timer_create_args_t cfg = { + .callback = send_cb, + .name = "send", + }; + esp_timer_create(&cfg, &timer); + if (timer) { + esp_timer_start_periodic(timer, TEST_PERIOD * 1000); + } + } + } else if (state == ESP_PEER_STATE_DISCONNECTED) { + if (timer) { + esp_timer_stop(timer); + esp_timer_delete(timer); + timer = NULL; + } + } + return 0; +} + +static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_MSG_TYPE_SDP) { + // Send local SDP to signaling server + esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg); + } + return 0; +} + +static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx) +{ + ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]); + return 0; +} + +static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx) +{ + return 0; +} + +static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx) +{ + int ans = -1; + for (int i = 0; i < sizeof(chat_content) / sizeof(chat_content[0]); i++) { + if (strcmp((char*)frame->data, chat_content[i].question) == 0) { + ans = i; + break; + } + } + if (ans >= 0) { + ESP_LOGI(TAG, "Get question:%s", (char*)frame->data); + esp_peer_data_frame_t data_frame = { + .type = ESP_PEER_DATA_CHANNEL_DATA, + .data = (uint8_t*)chat_content[ans].answer, + .size = strlen(chat_content[ans].answer) + 1, + }; + ESP_LOGI(TAG, "Send answer:%s", (char*)data_frame.data); + esp_peer_send_data(peer, &data_frame); + } else { + ESP_LOGI(TAG, "Get answer:%s", (char*)frame->data); + } + return 0; +} + +static void pc_task(void *arg) +{ + while (peer_running) { + esp_peer_main_loop(peer); + media_lib_thread_sleep(20); + } + media_lib_thread_destroy(NULL); +} + +static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx) +{ + if (peer == NULL) { + esp_peer_default_cfg_t peer_cfg = { + .agent_recv_timeout = 500, + }; + esp_peer_cfg_t cfg = { + .server_lists = &info->server_info, + .server_num = 1, + .audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV, + .audio_info = { + .codec = ESP_PEER_AUDIO_CODEC_G711A, + }, + .enable_data_channel = true, + .role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED, + .on_state = peer_state_handler, + .on_msg = peer_msg_handler, + .on_video_info = peer_video_info_handler, + .on_audio_info = peer_audio_info_handler, + .on_video_data = peer_video_data_handler, + .on_audio_data = peer_audio_data_handler, + .on_data = peer_data_handler, + .ctx = ctx, + .extra_cfg = &peer_cfg, + .extra_size = sizeof(esp_peer_default_cfg_t), + }; + int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer); + if (ret != ESP_PEER_ERR_NONE) { + return ret; + } + media_lib_thread_handle_t thread = NULL; + peer_running = true; + media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL); + if (thread == NULL) { + peer_running = false; + } + } + return 0; +} + +static int signaling_connected_handler(void* ctx) +{ + if (peer) { + return esp_peer_new_connection(peer); + } + return 0; +} + +static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) { + esp_peer_close(peer); + peer = NULL; + } else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) { + // Receive remote SDP + if (peer) { + esp_peer_send_msg(peer, (esp_peer_msg_t*)msg); + } + } + return 0; +} + +static int signaling_close_handler(void *ctx) +{ + return 0; +} + +static int start_signaling(char* url) +{ + esp_peer_signaling_cfg_t cfg = { + .signal_url = url, + .on_ice_info = signaling_ice_info_handler, + .on_connected = signaling_connected_handler, + .on_msg = signaling_msg_handler, + .on_close = signaling_close_handler, + }; + // Use APPRTC signaling + return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling); +} + +int start_webrtc(char *url) +{ + if (network_is_connected() == false) { + ESP_LOGE(TAG, "Wifi not connected yet"); + return -1; + } + stop_webrtc(); + return start_signaling(url); +} + +void query_webrtc(void) +{ + if (peer) { + esp_peer_query(peer); + } +} + +int stop_webrtc(void) +{ + peer_running = false; + if (timer) { + esp_timer_stop(timer); + esp_timer_delete(timer); + timer = NULL; + } + if (peer) { + esp_peer_close(peer); + peer = NULL; + } + if (signaling) { + esp_peer_signaling_stop(signaling); + signaling = NULL; + } + return 0; +} diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults index 037b680153..f0fd9cde6d 100644 --- a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults @@ -1,3 +1,6 @@ CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384 CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768 +CONFIG_LWIP_SNTP_MAX_SERVERS=2 +CONFIG_MBEDTLS_SSL_PROTO_DTLS=y +CONFIG_MBEDTLS_SSL_DTLS_SRTP=y From 25671a66c70c73b3142add0948e3150169dfb215 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 10 Jul 2025 11:40:19 +0200 Subject: [PATCH 2/6] fix(mosq): Relax CI criteria to build on v5.2+ --- .github/workflows/mosq__build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/mosq__build.yml b/.github/workflows/mosq__build.yml index 0182a9f332..78922ebb61 100644 --- a/.github/workflows/mosq__build.yml +++ b/.github/workflows/mosq__build.yml @@ -13,7 +13,7 @@ jobs: name: Mosquitto build strategy: matrix: - idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"] + idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2"] runs-on: ubuntu-22.04 container: espressif/idf:${{ matrix.idf_ver }} env: From 04e897b6ff1aa2814fb415bdf7c566fd6db7eb09 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 10 Jul 2025 13:29:06 +0200 Subject: [PATCH 3/6] fix(mosq): Add new ci target for esp-peer --- .../examples/serverless_mqtt/CMakeLists.txt | 5 ++++- .../examples/serverless_mqtt/main/CMakeLists.txt | 11 +++++++++-- .../serverless_mqtt/main/Kconfig.projbuild | 15 +++++++++++++++ .../examples/serverless_mqtt/sdkconfig.ci.default | 2 ++ .../serverless_mqtt/sdkconfig.ci.esp_peer | 3 +++ 5 files changed, 33 insertions(+), 3 deletions(-) create mode 100644 components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default create mode 100644 components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt index afd58e7104..9d12ff1ecc 100644 --- a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -4,6 +4,9 @@ cmake_minimum_required(VERSION 3.16) include($ENV{IDF_PATH}/tools/cmake/project.cmake) +# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets) +set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32") +if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS) execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup.sh ${CMAKE_BINARY_DIR} WORKING_DIRECTORY ${CMAKE_BINARY_DIR} @@ -12,7 +15,7 @@ execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup.sh if(script_result) message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}") endif() - list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/") +endif() project(serverless_mqtt) diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index 2b04811113..81c40bbb3a 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -1,5 +1,12 @@ +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + set(PEER_BACKEND_SRC "webrtc.c") +endif() + idf_component_register(SRCS "serverless_mqtt.c" "wifi_connect.c" - "webrtc.c" + "${PEER_BACKEND_SRC}" INCLUDE_DIRS "." - REQUIRES libjuice nvs_flash mqtt json esp_wifi media_lib_sal esp_webrtc peer_default) + REQUIRES libjuice nvs_flash mqtt json esp_wifi) +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default) +endif() diff --git a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild index 7e0e97de03..d907bc2b30 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild +++ b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild @@ -82,4 +82,19 @@ menu "Example Configuration" bool "peer2" endchoice + choice EXAMPLE_PEER_LIB + prompt "Choose peer library" + default EXAMPLE_PEER_LIB_LIBJUICE + help + Choose the peer library to use for WebRTC communication. + libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling) + esp_peer: Use ESP-IDF specific peer library + + config EXAMPLE_PEER_LIB_ESP_PEER + bool "esp_peer" + + config EXAMPLE_PEER_LIB_LIBJUICE + bool "libjuice" + endchoice + endmenu diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default new file mode 100644 index 0000000000..4c23f9c136 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default @@ -0,0 +1,2 @@ +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer new file mode 100644 index 0000000000..dfc8f70dfa --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer @@ -0,0 +1,3 @@ +CONFIG_IDF_TARGET="esp32s3" +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n From 79b8ab5167129b39ae415025e3515567c88e6e1a Mon Sep 17 00:00:00 2001 From: David Cermak Date: Fri, 11 Jul 2025 13:45:54 +0200 Subject: [PATCH 4/6] fix(mosq): Split the functionality into peer & mqtt --- .../serverless_mqtt/main/CMakeLists.txt | 2 +- .../examples/serverless_mqtt/main/peer_impl.c | 280 ++++++++++++++++++ .../examples/serverless_mqtt/main/peer_impl.h | 18 ++ .../serverless_mqtt/main/serverless_mqtt.c | 275 ++--------------- .../examples/serverless_mqtt/main/webrtc.c | 10 - 5 files changed, 329 insertions(+), 256 deletions(-) create mode 100644 components/mosquitto/examples/serverless_mqtt/main/peer_impl.c create mode 100644 components/mosquitto/examples/serverless_mqtt/main/peer_impl.h diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index 81c40bbb3a..037e6aee73 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -2,7 +2,7 @@ if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) set(PEER_BACKEND_SRC "webrtc.c") endif() -idf_component_register(SRCS "serverless_mqtt.c" +idf_component_register(SRCS "serverless_mqtt.c" "peer_impl.c" "wifi_connect.c" "${PEER_BACKEND_SRC}" INCLUDE_DIRS "." diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.c new file mode 100644 index 0000000000..a7b9b66d36 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.c @@ -0,0 +1,280 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "mqtt_client.h" +#include "esp_wifi.h" +#include "esp_log.h" +#include "esp_check.h" +#include "juice/juice.h" +#include "cJSON.h" +#include "peer_impl.h" + +#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) +#define OUR_PEER "1" +#define THEIR_PEER "2" +#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) +#define OUR_PEER "2" +#define THEIR_PEER "1" +#endif + +#define PEER_SYNC0 BIT(0) +#define PEER_SYNC1 BIT(1) +#define PEER_SYNC2 BIT(2) +#define PEER_FAIL BIT(3) +#define PEER_GATHER_DONE BIT(4) +#define PEER_DESC_PUBLISHED BIT(5) +#define PEER_CONNECTED BIT(6) + +#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) + +#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER +#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER +#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN + +static const char *TAG = "serverless_mqtt" OUR_PEER; +static char s_buffer[MAX_BUFFER_SIZE]; +static EventGroupHandle_t s_state = NULL; +static juice_agent_t *s_agent = NULL; +static cJSON *s_peer_desc_json = NULL; +static char *s_peer_desc = NULL; +static esp_mqtt_client_handle_t s_local_mqtt = NULL; +static on_peer_recv_t s_on_recv = NULL; + +char *wifi_get_ipv4(wifi_interface_t interface); +static esp_err_t sync_peers(void); +static esp_err_t create_candidates(void); + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +void peer_send(char* data, size_t size) +{ + juice_send(s_agent, data, size); +} + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_FAIL; + ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback"); + ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); + ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ESP_OK; + } +err: + ESP_LOGE(TAG, "Failed to init peer"); + return ret; +} + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { + ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); + } + xEventGroupSetBits(s_state, PEER_SYNC0); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { + break; + } + EventBits_t bits = xEventGroupGetBits(s_state); + if (event->data_len > 1 && s_agent) { + cJSON *root = cJSON_Parse(event->data); + if (root == NULL) { + break; + } + cJSON *desc = cJSON_GetObjectItem(root, "desc"); + if (desc == NULL) { + cJSON_Delete(root); + break; + } + printf("desc->valuestring:%s\n", desc->valuestring); + juice_set_remote_description(s_agent, desc->valuestring); + char cand_name[] = "cand0"; + while (true) { + cJSON *cand = cJSON_GetObjectItem(root, cand_name); + if (cand == NULL) { + break; + } + printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); + juice_add_remote_candidate(s_agent, cand->valuestring); + cand_name[4]++; + } + cJSON_Delete(root); + xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process + // and destroy the mqtt client + } +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } +#else + if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC1); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } +#endif + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static esp_err_t sync_peers(void) +{ + esp_err_t ret = ESP_OK; + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, + .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, + }; + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), + err, TAG, "Failed to register mqtt event handler"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + ESP_LOGI(TAG, "Waiting for the other peer..."); + const int max_sync_retry = 60; + int retry = 0; + while (true) { + EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); + if (bits & PEER_SYNC2) { + break; + } + if (bits & PEER_SYNC1) { + continue; + } + ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); + ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish mqtt message"); +#endif + } + ESP_LOGI(TAG, "Sync done"); + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish peer's description"); + ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); +err: + free(s_peer_desc); + esp_mqtt_client_destroy(client); + return ret; +} + +static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) +{ + ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); + if (state == JUICE_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { + esp_restart(); + } +} + +static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) +{ + static uint8_t cand_nr = 0; + if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates + char cand_name[] = "cand0"; + cand_name[4] += cand_nr++; + cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); + } +} + +static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) +{ + ESP_LOGI(TAG, "Gathering done"); + if (s_state) { + xEventGroupSetBits(s_state, PEER_GATHER_DONE); + } +} + + + +static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +{ + if (s_local_mqtt) { + s_on_recv(data, size); + } +} + +static esp_err_t create_candidates(void) +{ + ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + s_peer_desc_json = cJSON_CreateObject(); + esp_err_t ret = ESP_OK; + juice_set_log_level(JUICE_LOG_LEVEL_INFO); + juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, + .bind_address = wifi_get_ipv4(WIFI_IF_STA), + .stun_server_port = 19302, + .cb_state_changed = juice_state, + .cb_candidate = juice_candidate, + .cb_gathering_done = juice_gathering_done, + .cb_recv = juice_recv, + }; + + s_agent = juice_create(&config); + ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); + ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to get local description"); + ESP_LOGI(TAG, "desc: %s", s_buffer); + cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); + + ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to start gathering candidates"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + s_peer_desc = cJSON_Print(s_peer_desc_json); + ESP_LOGI(TAG, "desc: %s", s_peer_desc); + cJSON_Delete(s_peer_desc_json); + return ESP_OK; + +err: + juice_destroy(s_agent); + s_agent = NULL; + cJSON_Delete(s_peer_desc_json); + s_peer_desc_json = NULL; + return ret; +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h new file mode 100644 index 0000000000..e96202c4b9 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h @@ -0,0 +1,18 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ + +#include +#include "esp_random.h" +#include "esp_sleep.h" +#include "mosq_broker.h" + +typedef void (*on_peer_recv_t)(const char *data, size_t size); + +esp_err_t peer_init(on_peer_recv_t cb); + +void peer_get_buffer(char ** buffer, size_t *buffer_len); + +void peer_send(char* data, size_t size); diff --git a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c index 8dd0bee7c7..bd8b469c6d 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c +++ b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD * * SPDX-License-Identifier: Unlicense OR CC0-1.0 */ @@ -13,30 +13,9 @@ #include "esp_check.h" #include "esp_sleep.h" #include "mosq_broker.h" -#include "juice/juice.h" -#include "cJSON.h" +#include "peer_impl.h" -#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) -#define OUR_PEER "1" -#define THEIR_PEER "2" -#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) -#define OUR_PEER "2" -#define THEIR_PEER "1" -#endif - -#define PEER_SYNC0 BIT(0) -#define PEER_SYNC1 BIT(1) -#define PEER_SYNC2 BIT(2) -#define PEER_FAIL BIT(3) -#define PEER_GATHER_DONE BIT(4) -#define PEER_DESC_PUBLISHED BIT(5) -#define PEER_CONNECTED BIT(6) - -#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) - -#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER -#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER -#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN +#define ALIGN(size) (((size) + 3U) & ~(3U)) typedef struct message_wrap { uint16_t topic_len; @@ -44,196 +23,18 @@ typedef struct message_wrap { char data[]; } __attribute__((packed)) message_wrap_t; -static const char *TAG = "serverless_mqtt" OUR_PEER; -static char s_buffer[MAX_BUFFER_SIZE]; -static EventGroupHandle_t s_state = NULL; -static juice_agent_t *s_agent = NULL; -static cJSON *s_peer_desc_json = NULL; -static char *s_peer_desc = NULL; +static const char *TAG = "serverless_mqtt"; + static esp_mqtt_client_handle_t s_local_mqtt = NULL; char *wifi_get_ipv4(wifi_interface_t interface); esp_err_t wifi_connect(void); -static esp_err_t sync_peers(void); -static esp_err_t create_candidates(void); static esp_err_t create_local_client(void); static esp_err_t create_local_broker(void); -void app_main(void) -{ - __attribute__((__unused__)) esp_err_t ret; - ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); - ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); - ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); - ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); - EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); - if (bits & PEER_CONNECTED) { - ESP_LOGI(TAG, "Peer is connected!"); - ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); - ESP_LOGI(TAG, "Everything is ready, exiting main task"); - return; - } -err: - ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); - esp_deep_sleep(1000000LL * (esp_random() % 20)); -} - -static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) -{ - esp_mqtt_event_handle_t event = event_data; - esp_mqtt_client_handle_t client = event->client; - switch ((esp_mqtt_event_id_t)event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { - ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); - } - xEventGroupSetBits(s_state, PEER_SYNC0); - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { - break; - } - EventBits_t bits = xEventGroupGetBits(s_state); - if (event->data_len > 1 && s_agent) { - cJSON *root = cJSON_Parse(event->data); - if (root == NULL) { - break; - } - cJSON *desc = cJSON_GetObjectItem(root, "desc"); - if (desc == NULL) { - cJSON_Delete(root); - break; - } - printf("desc->valuestring:%s\n", desc->valuestring); - juice_set_remote_description(s_agent, desc->valuestring); - char cand_name[] = "cand0"; - while (true) { - cJSON *cand = cJSON_GetObjectItem(root, cand_name); - if (cand == NULL) { - break; - } - printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); - juice_add_remote_candidate(s_agent, cand->valuestring); - cand_name[4]++; - } - cJSON_Delete(root); - xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process - // and destroy the mqtt client - } -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } -#else - if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC1); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } -#endif - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - default: - ESP_LOGI(TAG, "Other event id:%d", event->event_id); - break; - } -} +esp_err_t peer_init(on_peer_recv_t cb); -static esp_err_t sync_peers(void) -{ - esp_err_t ret = ESP_OK; - esp_mqtt_client_config_t mqtt_cfg = { - .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, - .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, - }; - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), - err, TAG, "Failed to register mqtt event handler"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - ESP_LOGI(TAG, "Waiting for the other peer..."); - const int max_sync_retry = 60; - int retry = 0; - while (true) { - EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); - if (bits & PEER_SYNC2) { - break; - } - if (bits & PEER_SYNC1) { - continue; - } - ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); - ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish mqtt message"); -#endif - } - ESP_LOGI(TAG, "Sync done"); - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish peer's description"); - ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); -err: - free(s_peer_desc); - esp_mqtt_client_destroy(client); - return ret; -} - -static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) -{ - ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); - if (state == JUICE_STATE_CONNECTED) { - xEventGroupSetBits(s_state, PEER_CONNECTED); - } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { - esp_restart(); - } -} - -static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) -{ - static uint8_t cand_nr = 0; - if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates - char cand_name[] = "cand0"; - cand_name[4] += cand_nr++; - cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); - } -} - -static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) -{ - ESP_LOGI(TAG, "Gathering done"); - if (s_state) { - xEventGroupSetBits(s_state, PEER_GATHER_DONE); - } -} - -#define ALIGN(size) (((size) + 3U) & ~(3U)) - -static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +static void peer_recv(const char *data, size_t size) { if (s_local_mqtt) { message_wrap_t *message = (message_wrap_t *)data; @@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload); esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0); } + } -static esp_err_t create_candidates(void) +void app_main(void) { - ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); - s_peer_desc_json = cJSON_CreateObject(); - esp_err_t ret = ESP_OK; - juice_set_log_level(JUICE_LOG_LEVEL_INFO); - juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, - .bind_address = wifi_get_ipv4(WIFI_IF_STA), - .stun_server_port = 19302, - .cb_state_changed = juice_state, - .cb_candidate = juice_candidate, - .cb_gathering_done = juice_gathering_done, - .cb_recv = juice_recv, - }; - - s_agent = juice_create(&config); - ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); - ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to get local description"); - ESP_LOGI(TAG, "desc: %s", s_buffer); - cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); - - ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to start gathering candidates"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - s_peer_desc = cJSON_Print(s_peer_desc_json); - ESP_LOGI(TAG, "desc: %s", s_peer_desc); - cJSON_Delete(s_peer_desc_json); - return ESP_OK; - + __attribute__((__unused__)) esp_err_t ret; + ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); + ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); + ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library"); + ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); + ESP_LOGI(TAG, "Everything is ready, exiting main task"); + return; err: - juice_destroy(s_agent); - s_agent = NULL; - cJSON_Delete(s_peer_desc_json); - s_peer_desc_json = NULL; - return ret; + ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); + esp_deep_sleep(1000000LL * (esp_random() % 20)); } static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data) @@ -335,18 +112,26 @@ static esp_err_t create_local_client(void) static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain) { - if (client && strcmp(client, "local_mqtt") == 0 ) { + if (client && strcmp(client, "local_mqtt") == 0) { // This is our little local client -- do not forward return; } ESP_LOGI(TAG, "handle_message topic:%s", topic); ESP_LOGI(TAG, "handle_message data:%.*s", len, payload); ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain); - if (s_local_mqtt && s_agent) { + if (s_local_mqtt) { + static char *s_buffer = NULL; + static size_t s_buffer_len = 0; + if (s_buffer == NULL || s_buffer_len == 0) { + peer_get_buffer(&s_buffer, &s_buffer_len); + if (s_buffer == NULL || s_buffer_len == 0) { + return; + } + } int topic_len = strlen(topic) + 1; // null term int topic_len_aligned = ALIGN(topic_len); int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len; - if (total_msg_len > MAX_BUFFER_SIZE) { + if (total_msg_len > s_buffer_len) { ESP_LOGE(TAG, "Fail to forward, message too long"); return; } @@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in memcpy(s_buffer + 4, topic, topic_len); memcpy(s_buffer + 4 + topic_len_aligned, payload, len); - juice_send(s_agent, s_buffer, total_msg_len); + peer_send(s_buffer, total_msg_len); } } diff --git a/components/mosquitto/examples/serverless_mqtt/main/webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/webrtc.c index c506e8cf24..5966ae64a7 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/webrtc.c +++ b/components/mosquitto/examples/serverless_mqtt/main/webrtc.c @@ -3,15 +3,6 @@ * * SPDX-License-Identifier: Unlicense OR CC0-1.0 */ -/* OpenAI realtime communication Demo code - - This example code is in the Public Domain (or CC0 licensed, at your option.) - - Unless required by applicable law or agreed to in writing, this - software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - CONDITIONS OF ANY KIND, either express or implied. -*/ - #include "media_lib_os.h" #include "esp_log.h" #include "esp_webrtc_defaults.h" @@ -20,7 +11,6 @@ #include "esp_timer.h" #include "esp_random.h" - bool network_is_connected(void); /** * @brief Start WebRTC From 2b9345e908f2690fc594c660bda6c576313079fb Mon Sep 17 00:00:00 2001 From: David Cermak Date: Fri, 11 Jul 2025 17:08:57 +0200 Subject: [PATCH 5/6] fix(mosq): Add WebRTC implementation of peer_backend --- .../examples/serverless_mqtt/CMakeLists.txt | 2 +- ...default-event-to-http-client-handler.patch | 0 .../Remove-deprecated-freeRTOS-header.patch | 16 +- .../install.sh} | 2 + ...eer-Add-direct-dependency-to-libsrtp.patch | 23 ++ .../serverless_mqtt/main/CMakeLists.txt | 6 +- .../serverless_mqtt/main/Kconfig.projbuild | 50 ++- .../serverless_mqtt/main/idf_component.yml | 1 + .../main/{peer_impl.c => peer_impl_juice.c} | 1 + .../serverless_mqtt/main/peer_impl_webrtc.c | 227 +++++++++++++ .../examples/serverless_mqtt/main/webrtc.c | 301 ------------------ 11 files changed, 304 insertions(+), 325 deletions(-) rename components/mosquitto/examples/serverless_mqtt/{ => esp_peer_setup}/Add-default-event-to-http-client-handler.patch (100%) rename components/mosquitto/examples/serverless_mqtt/{ => esp_peer_setup}/Remove-deprecated-freeRTOS-header.patch (61%) rename components/mosquitto/examples/serverless_mqtt/{esp_peer_setup.sh => esp_peer_setup/install.sh} (89%) create mode 100644 components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch rename components/mosquitto/examples/serverless_mqtt/main/{peer_impl.c => peer_impl_juice.c} (99%) create mode 100644 components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c delete mode 100644 components/mosquitto/examples/serverless_mqtt/main/webrtc.c diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt index 9d12ff1ecc..9633f279d6 100644 --- a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -7,7 +7,7 @@ include($ENV{IDF_PATH}/tools/cmake/project.cmake) # Setup ESP-PEER from GitHub repo (but it's supported only on certain targets) set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32") if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS) -execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup.sh +execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh ${CMAKE_BINARY_DIR} WORKING_DIRECTORY ${CMAKE_BINARY_DIR} RESULT_VARIABLE script_result) diff --git a/components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Add-default-event-to-http-client-handler.patch similarity index 100% rename from components/mosquitto/examples/serverless_mqtt/Add-default-event-to-http-client-handler.patch rename to components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Add-default-event-to-http-client-handler.patch diff --git a/components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch similarity index 61% rename from components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch rename to components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch index ab5cd3d6bb..37c4d797fb 100644 --- a/components/mosquitto/examples/serverless_mqtt/Remove-deprecated-freeRTOS-header.patch +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch @@ -1,24 +1,28 @@ -From 06b15a30e860daef303b6c6541a2cae858c17e6c Mon Sep 17 00:00:00 2001 +From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001 From: David Cermak Date: Thu, 10 Jul 2025 11:09:57 +0200 Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header --- - components/media_lib_sal/port/media_lib_os_freertos.c | 2 +- - 1 file changed, 1 insertion(+), 1 deletion(-) + components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++ + 1 file changed, 4 insertions(+) diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c -index d248d59..7b588ca 100644 +index d248d59..aea0527 100644 --- a/components/media_lib_sal/port/media_lib_os_freertos.c +++ b/components/media_lib_sal/port/media_lib_os_freertos.c -@@ -40,7 +40,7 @@ +@@ -40,8 +40,12 @@ #include "esp_idf_version.h" #if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT --#include "freertos/task_snapshot.h" ++#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0)) +#include "esp_private/freertos_debug.h" ++#else + #include "freertos/task_snapshot.h" #endif ++#endif #ifdef __XTENSA__ + #include "esp_debug_helpers.h" -- 2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh similarity index 89% rename from components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh rename to components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh index f1f56ce4e7..599f6ba661 100755 --- a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup.sh +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh @@ -14,6 +14,7 @@ COMPONENTS_SRC="${EXTRACTED_DIR}/components" COMPONENTS_DST="${ESP_PEER_DIR}/components" PATCH_FILE_1="${THIS_DIR}/Add-default-event-to-http-client-handler.patch" PATCH_FILE_2="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch" +PATCH_FILE_3="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch" # Download if not exists if [ ! -d "$EXTRACTED_DIR" ]; then @@ -22,6 +23,7 @@ if [ ! -d "$EXTRACTED_DIR" ]; then unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR" patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1" patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_3" mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR} mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components fi diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch new file mode 100644 index 0000000000..2275383f24 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch @@ -0,0 +1,23 @@ +From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Fri, 11 Jul 2025 16:59:21 +0200 +Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp + +--- + components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +index 2af35cf..3fb4615 100644 +--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt ++++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include) + + get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME) + add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a" +- PRIV_REQUIRES ${BASE_DIR} esp_timer) ++ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp) + target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}") + target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default) +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index 037e6aee73..52a905ee20 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -1,8 +1,10 @@ if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) - set(PEER_BACKEND_SRC "webrtc.c") + set(PEER_BACKEND_SRC "peer_impl_webrtc.c") +else() + set(PEER_BACKEND_SRC "peer_impl_juice.c") endif() -idf_component_register(SRCS "serverless_mqtt.c" "peer_impl.c" +idf_component_register(SRCS "serverless_mqtt.c" "wifi_connect.c" "${PEER_BACKEND_SRC}" INCLUDE_DIRS "." diff --git a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild index d907bc2b30..0714096620 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild +++ b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild @@ -33,8 +33,40 @@ menu "Example Configuration" WiFi station password for the example to use. endmenu + choice EXAMPLE_PEER_LIB + prompt "Choose peer library" + default EXAMPLE_PEER_LIB_LIBJUICE + help + Choose the peer library to use for WebRTC communication. + libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling) + esp_peer: Use ESP-IDF specific peer library + + config EXAMPLE_PEER_LIB_ESP_PEER + bool "esp_peer" + + config EXAMPLE_PEER_LIB_LIBJUICE + bool "libjuice" + endchoice + + config EXAMPLE_WEBRTC_URL + string "WebRTC server URL" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "https://webrtc.espressif.com/join/" + help + URL of WebRTC remote endpoint. + + config EXAMPLE_WEBRTC_ROOM_ID + string "WebRTC room ID" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "12345" + help + Room ID for WebRTC synchronisation. + Could be a random number, but the same for both peers. + + config EXAMPLE_MQTT_BROKER_URI string "MQTT Broker URL" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "mqtt://mqtt.eclipseprojects.io" help URL of the mqtt broker use for synchronisation and exchanging @@ -42,12 +74,14 @@ menu "Example Configuration" config EXAMPLE_MQTT_SYNC_TOPIC string "MQTT topic for synchronisation" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "/topic/serverless_mqtt" help MQTT topic used fo synchronisation. config EXAMPLE_STUN_SERVER string "Hostname of STUN server" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "stun.l.google.com" help STUN server hostname. @@ -67,6 +101,7 @@ menu "Example Configuration" choice EXAMPLE_SERVERLESS_ROLE prompt "Choose your role" + depends on EXAMPLE_PEER_LIB_LIBJUICE default EXAMPLE_SERVERLESS_ROLE_PEER1 help Choose either peer1 or peer2. @@ -82,19 +117,4 @@ menu "Example Configuration" bool "peer2" endchoice - choice EXAMPLE_PEER_LIB - prompt "Choose peer library" - default EXAMPLE_PEER_LIB_LIBJUICE - help - Choose the peer library to use for WebRTC communication. - libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling) - esp_peer: Use ESP-IDF specific peer library - - config EXAMPLE_PEER_LIB_ESP_PEER - bool "esp_peer" - - config EXAMPLE_PEER_LIB_LIBJUICE - bool "libjuice" - endchoice - endmenu diff --git a/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml index e3297d5351..f71e383ad7 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml +++ b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml @@ -3,3 +3,4 @@ dependencies: espressif/mosquitto: override_path: ../../.. espressif/sock_utils: "*" + espressif/esp_libsrtp: "~1.0.0" diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c similarity index 99% rename from components/mosquitto/examples/serverless_mqtt/main/peer_impl.c rename to components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c index a7b9b66d36..87415df453 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.c +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c @@ -66,6 +66,7 @@ esp_err_t peer_init(on_peer_recv_t cb) { esp_err_t ret = ESP_FAIL; ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback"); + s_on_recv = cb; ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c new file mode 100644 index 0000000000..b85fda7296 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c @@ -0,0 +1,227 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include "media_lib_os.h" +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "esp_log.h" +#include "esp_webrtc_defaults.h" +#include "esp_peer_default.h" +#include "common.h" +#include "esp_check.h" +#include "peer_impl.h" +#include "sdkconfig.h" + +#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID) +#define PEER_CONNECTED BIT(0) +#define PEER_DISCONNECTED BIT(1) +#define MAX_BUFFER_SIZE (4*1024) + +static EventGroupHandle_t s_state = NULL; +static const char *TAG = "serverless_mqtt_webrtc"; + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + static char s_buffer[MAX_BUFFER_SIZE]; + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +static int start_webrtc(char *url); + +static on_peer_recv_t s_on_recv = NULL; + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_OK; + s_on_recv = cb; + s_state = xEventGroupCreate(); + ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC"); + + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_DISCONNECTED | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ret; + } +err: + vEventGroupDelete(s_state); + return ret; +} + +static int stop_webrtc(void); + +static esp_peer_signaling_handle_t signaling = NULL; +static esp_peer_handle_t peer = NULL; +static bool peer_running = false; + +static int peer_state_handler(esp_peer_state_t state, void* ctx) +{ + if (state == ESP_PEER_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == ESP_PEER_STATE_DISCONNECTED) { + xEventGroupSetBits(s_state, PEER_DISCONNECTED); + } + return 0; +} + +static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_MSG_TYPE_SDP) { + // Send local SDP to signaling server + esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg); + } + return 0; +} + +static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx) +{ + ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]); + return 0; +} + +static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx) +{ + return 0; +} + +static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx) +{ + if (frame && frame->size > 0) { + s_on_recv((char*)frame->data, frame->size); + } + return 0; +} + +static void pc_task(void *arg) +{ + while (peer_running) { + esp_peer_main_loop(peer); + media_lib_thread_sleep(20); + } + media_lib_thread_destroy(NULL); +} + +static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx) +{ + if (peer == NULL) { + esp_peer_default_cfg_t peer_cfg = { + .agent_recv_timeout = 500, + }; + esp_peer_cfg_t cfg = { + .server_lists = &info->server_info, + .server_num = 1, + .audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV, + .audio_info = { + .codec = ESP_PEER_AUDIO_CODEC_G711A, + }, + .enable_data_channel = true, + .role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED, + .on_state = peer_state_handler, + .on_msg = peer_msg_handler, + .on_video_info = peer_video_info_handler, + .on_audio_info = peer_audio_info_handler, + .on_video_data = peer_video_data_handler, + .on_audio_data = peer_audio_data_handler, + .on_data = peer_data_handler, + .ctx = ctx, + .extra_cfg = &peer_cfg, + .extra_size = sizeof(esp_peer_default_cfg_t), + }; + int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer); + if (ret != ESP_PEER_ERR_NONE) { + return ret; + } + media_lib_thread_handle_t thread = NULL; + peer_running = true; + media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL); + if (thread == NULL) { + peer_running = false; + } + } + return 0; +} + +static int signaling_connected_handler(void* ctx) +{ + if (peer) { + return esp_peer_new_connection(peer); + } + return 0; +} + +static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) { + esp_peer_close(peer); + peer = NULL; + } else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) { + // Receive remote SDP + if (peer) { + esp_peer_send_msg(peer, (esp_peer_msg_t*)msg); + } + } + return 0; +} + +static int signaling_close_handler(void *ctx) +{ + return 0; +} + +static int start_signaling(char* url) +{ + esp_peer_signaling_cfg_t cfg = { + .signal_url = url, + .on_ice_info = signaling_ice_info_handler, + .on_connected = signaling_connected_handler, + .on_msg = signaling_msg_handler, + .on_close = signaling_close_handler, + }; + // Use APPRTC signaling + return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling); +} + +static int start_webrtc(char *url) +{ + stop_webrtc(); + return start_signaling(url); +} + +static int stop_webrtc(void) +{ + peer_running = false; + if (peer) { + esp_peer_close(peer); + peer = NULL; + } + if (signaling) { + esp_peer_signaling_stop(signaling); + signaling = NULL; + } + return 0; +} + +void peer_send(char* data, size_t size) +{ + esp_peer_data_frame_t data_frame = { + .type = ESP_PEER_DATA_CHANNEL_DATA, + .data = (uint8_t*)data, + .size = size, + }; + esp_peer_send_data(peer, &data_frame); +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/webrtc.c deleted file mode 100644 index 5966ae64a7..0000000000 --- a/components/mosquitto/examples/serverless_mqtt/main/webrtc.c +++ /dev/null @@ -1,301 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD - * - * SPDX-License-Identifier: Unlicense OR CC0-1.0 - */ -#include "media_lib_os.h" -#include "esp_log.h" -#include "esp_webrtc_defaults.h" -#include "esp_peer_default.h" -#include "common.h" -#include "esp_timer.h" -#include "esp_random.h" - -bool network_is_connected(void); -/** - * @brief Start WebRTC - * - * @param[in] url Signaling url - * - * @return - * - 0 On success - * - Others Fail to start - */ -int start_webrtc(char *url); - -/** - * @brief Query WebRTC Status - */ -void query_webrtc(void); - -/** - * @brief Stop WebRTC - * - * @return - * - 0 On success - * - Others Fail to stop - */ -int stop_webrtc(void); - -#define TAG "PEER_DEMO" - -#define TEST_PERIOD 1000 - -static esp_peer_signaling_handle_t signaling = NULL; -static esp_peer_handle_t peer = NULL; -static esp_timer_handle_t timer; -static int send_sequence = 0; -static bool peer_running = false; - -typedef struct { - const char* question; - const char* answer; -} data_channel_chat_content_t; - -static data_channel_chat_content_t chat_content[] = { - {"Hi!", "Hello!"}, - {"How are you?", "I am fine."}, - {"Wish to be your friend.", "Great!"}, - {"What's your name?", "I am a chatbot, nice to meet you!"}, - {"What do you do?", "I am here to chat and assist you with various tasks."}, - {"How old are you?", "I don't have an age. I was created to chat with you!"}, - {"Do you have hobbies?", "I enjoy chatting with you and learning new things."}, - {"Tell me a story.", "Once upon a time, a curious cat discovered a magical world..."}, - {"Tell me a joke.", "Why don't skeletons fight each other? They don't have the guts!"}, - {"What is the weather like?", "I am not sure, but you can check your local forecast."}, - {"What is your favorite color?", "I don't have a favorite color, but I like all of them!"}, - {"What is the time?", "Sorry, I can't tell the time. You can check your device for that."}, - {"What can you do?", "I can answer questions, tell jokes, help with tasks, and much more!"}, - {"Where are you from?", "I was created by developers, so I don't have a specific location."}, - {"What is love?", "Love is a complex emotion that connects people. What do you think love is?"}, - {"Do you like music?", "I don't listen to music, but I know about it! What's your favorite genre?"}, - {"Goodbye!", "Bye!"}, -}; - -static void send_cb(void *ctx) -{ - if (peer) { - uint8_t data[64]; - memset(data, (uint8_t)send_sequence, sizeof(data)); - esp_peer_audio_frame_t audio_frame = { - .data = data, - .size = sizeof(data), - .pts = send_sequence, - }; - // Send audio data - esp_peer_send_audio(peer, &audio_frame); - send_sequence++; - int question = esp_random() % (sizeof(chat_content) / sizeof(chat_content[0])); - - // Send question through data channel - esp_peer_data_frame_t data_frame = { - .type = ESP_PEER_DATA_CHANNEL_DATA, - .data = (uint8_t*)chat_content[question].question, - .size = strlen(chat_content[question].question) + 1, - }; - ESP_LOGI(TAG, "Send question:%s", (char*)data_frame.data); - esp_peer_send_data(peer, &data_frame); - } -} - -static int peer_state_handler(esp_peer_state_t state, void* ctx) -{ - if (state == ESP_PEER_STATE_CONNECTED) { - if (timer == NULL) { - esp_timer_create_args_t cfg = { - .callback = send_cb, - .name = "send", - }; - esp_timer_create(&cfg, &timer); - if (timer) { - esp_timer_start_periodic(timer, TEST_PERIOD * 1000); - } - } - } else if (state == ESP_PEER_STATE_DISCONNECTED) { - if (timer) { - esp_timer_stop(timer); - esp_timer_delete(timer); - timer = NULL; - } - } - return 0; -} - -static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx) -{ - if (msg->type == ESP_PEER_MSG_TYPE_SDP) { - // Send local SDP to signaling server - esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg); - } - return 0; -} - -static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx) -{ - return 0; -} - -static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx) -{ - return 0; -} - -static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx) -{ - ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]); - return 0; -} - -static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx) -{ - return 0; -} - -static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx) -{ - int ans = -1; - for (int i = 0; i < sizeof(chat_content) / sizeof(chat_content[0]); i++) { - if (strcmp((char*)frame->data, chat_content[i].question) == 0) { - ans = i; - break; - } - } - if (ans >= 0) { - ESP_LOGI(TAG, "Get question:%s", (char*)frame->data); - esp_peer_data_frame_t data_frame = { - .type = ESP_PEER_DATA_CHANNEL_DATA, - .data = (uint8_t*)chat_content[ans].answer, - .size = strlen(chat_content[ans].answer) + 1, - }; - ESP_LOGI(TAG, "Send answer:%s", (char*)data_frame.data); - esp_peer_send_data(peer, &data_frame); - } else { - ESP_LOGI(TAG, "Get answer:%s", (char*)frame->data); - } - return 0; -} - -static void pc_task(void *arg) -{ - while (peer_running) { - esp_peer_main_loop(peer); - media_lib_thread_sleep(20); - } - media_lib_thread_destroy(NULL); -} - -static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx) -{ - if (peer == NULL) { - esp_peer_default_cfg_t peer_cfg = { - .agent_recv_timeout = 500, - }; - esp_peer_cfg_t cfg = { - .server_lists = &info->server_info, - .server_num = 1, - .audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV, - .audio_info = { - .codec = ESP_PEER_AUDIO_CODEC_G711A, - }, - .enable_data_channel = true, - .role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED, - .on_state = peer_state_handler, - .on_msg = peer_msg_handler, - .on_video_info = peer_video_info_handler, - .on_audio_info = peer_audio_info_handler, - .on_video_data = peer_video_data_handler, - .on_audio_data = peer_audio_data_handler, - .on_data = peer_data_handler, - .ctx = ctx, - .extra_cfg = &peer_cfg, - .extra_size = sizeof(esp_peer_default_cfg_t), - }; - int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer); - if (ret != ESP_PEER_ERR_NONE) { - return ret; - } - media_lib_thread_handle_t thread = NULL; - peer_running = true; - media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL); - if (thread == NULL) { - peer_running = false; - } - } - return 0; -} - -static int signaling_connected_handler(void* ctx) -{ - if (peer) { - return esp_peer_new_connection(peer); - } - return 0; -} - -static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx) -{ - if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) { - esp_peer_close(peer); - peer = NULL; - } else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) { - // Receive remote SDP - if (peer) { - esp_peer_send_msg(peer, (esp_peer_msg_t*)msg); - } - } - return 0; -} - -static int signaling_close_handler(void *ctx) -{ - return 0; -} - -static int start_signaling(char* url) -{ - esp_peer_signaling_cfg_t cfg = { - .signal_url = url, - .on_ice_info = signaling_ice_info_handler, - .on_connected = signaling_connected_handler, - .on_msg = signaling_msg_handler, - .on_close = signaling_close_handler, - }; - // Use APPRTC signaling - return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling); -} - -int start_webrtc(char *url) -{ - if (network_is_connected() == false) { - ESP_LOGE(TAG, "Wifi not connected yet"); - return -1; - } - stop_webrtc(); - return start_signaling(url); -} - -void query_webrtc(void) -{ - if (peer) { - esp_peer_query(peer); - } -} - -int stop_webrtc(void) -{ - peer_running = false; - if (timer) { - esp_timer_stop(timer); - esp_timer_delete(timer); - timer = NULL; - } - if (peer) { - esp_peer_close(peer); - peer = NULL; - } - if (signaling) { - esp_peer_signaling_stop(signaling); - signaling = NULL; - } - return 0; -} From 0fd4a54a246160c32595bc953ec7938244a106e7 Mon Sep 17 00:00:00 2001 From: David Cermak Date: Fri, 11 Jul 2025 17:09:08 +0200 Subject: [PATCH 6/6] fix(mosq): Reverts relax CI criteria to build on v5.2+ This reverts commit 25671a66c70c73b3142add0948e3150169dfb215. --- .github/workflows/mosq__build.yml | 22 +++++++++++++------ .../serverless_mqtt/main/idf_component.yml | 1 - .../serverless_mqtt/sdkconfig.ci.esp_peer | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/.github/workflows/mosq__build.yml b/.github/workflows/mosq__build.yml index 78922ebb61..ee28aa98aa 100644 --- a/.github/workflows/mosq__build.yml +++ b/.github/workflows/mosq__build.yml @@ -13,11 +13,16 @@ jobs: name: Mosquitto build strategy: matrix: - idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2"] + idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"] + example: ["broker", "serverless_mqtt"] + exclude: + - idf_ver: "release-v5.1" + example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer + runs-on: ubuntu-22.04 container: espressif/idf:${{ matrix.idf_ver }} env: - TEST_DIR: components/mosquitto/examples + TEST_DIR: components/mosquitto/examples/${{ matrix.example }} TARGET_TEST: broker TARGET_TEST_DIR: build_esp32_default steps: @@ -31,14 +36,17 @@ jobs: . ${IDF_PATH}/export.sh pip install idf-component-manager idf-build-apps --upgrade python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml - # upload only the target test artifacts - cd ${TEST_DIR}/${TARGET_TEST} - ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} - zip -qur artifacts.zip ${TARGET_TEST_DIR} + if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then + # upload only the target test artifacts + cd ${TEST_DIR} + ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} + zip -qur artifacts.zip ${TARGET_TEST_DIR} + fi - uses: actions/upload-artifact@v4 + if: ${{ matrix.example == 'broker' }} with: name: mosq_target_esp32_${{ matrix.idf_ver }} - path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip + path: ${{ env.TEST_DIR }}/artifacts.zip if-no-files-found: error test_mosq: diff --git a/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml index f71e383ad7..e3297d5351 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml +++ b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml @@ -3,4 +3,3 @@ dependencies: espressif/mosquitto: override_path: ../../.. espressif/sock_utils: "*" - espressif/esp_libsrtp: "~1.0.0" diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer index dfc8f70dfa..a949251a35 100644 --- a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer @@ -1,3 +1,3 @@ -CONFIG_IDF_TARGET="esp32s3" +CONFIG_IDF_TARGET="esp32" CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n