diff --git a/.github/workflows/mosq__build.yml b/.github/workflows/mosq__build.yml index 0182a9f332..ee28aa98aa 100644 --- a/.github/workflows/mosq__build.yml +++ b/.github/workflows/mosq__build.yml @@ -14,10 +14,15 @@ jobs: strategy: matrix: idf_ver: ["latest", "release-v5.5", "release-v5.4", "release-v5.3", "release-v5.2", "release-v5.1"] + example: ["broker", "serverless_mqtt"] + exclude: + - idf_ver: "release-v5.1" + example: "serverless_mqtt" # serverless_mqtt is not supported due to esp-peer + runs-on: ubuntu-22.04 container: espressif/idf:${{ matrix.idf_ver }} env: - TEST_DIR: components/mosquitto/examples + TEST_DIR: components/mosquitto/examples/${{ matrix.example }} TARGET_TEST: broker TARGET_TEST_DIR: build_esp32_default steps: @@ -31,14 +36,17 @@ jobs: . ${IDF_PATH}/export.sh pip install idf-component-manager idf-build-apps --upgrade python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml - # upload only the target test artifacts - cd ${TEST_DIR}/${TARGET_TEST} - ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} - zip -qur artifacts.zip ${TARGET_TEST_DIR} + if [ "${{ matrix.example }}" == "${TARGET_TEST}" ]; then + # upload only the target test artifacts + cd ${TEST_DIR} + ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} + zip -qur artifacts.zip ${TARGET_TEST_DIR} + fi - uses: actions/upload-artifact@v4 + if: ${{ matrix.example == 'broker' }} with: name: mosq_target_esp32_${{ matrix.idf_ver }} - path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip + path: ${{ env.TEST_DIR }}/artifacts.zip if-no-files-found: error test_mosq: diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt index c9935c9567..9633f279d6 100644 --- a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -3,4 +3,19 @@ cmake_minimum_required(VERSION 3.16) include($ENV{IDF_PATH}/tools/cmake/project.cmake) + +# Setup ESP-PEER from GitHub repo (but it's supported only on certain targets) +set(ESP_PEER_COMPATIBLE_TARGETS "esp32s2" "esp32s3" "esp32p4" "esp32") +if(IDF_TARGET IN_LIST ESP_PEER_COMPATIBLE_TARGETS) +execute_process(COMMAND ${CMAKE_BINARY_DIR}/../esp_peer_setup/install.sh + ${CMAKE_BINARY_DIR} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + RESULT_VARIABLE script_result) + +if(script_result) + message(FATAL_ERROR "Script esp_peer_setup.sh failed with exit code ${script_result}") +endif() +list(APPEND EXTRA_COMPONENT_DIRS "${CMAKE_BINARY_DIR}/esp-peer/components/") +endif() + project(serverless_mqtt) diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Add-default-event-to-http-client-handler.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Add-default-event-to-http-client-handler.patch new file mode 100644 index 0000000000..b84e34f1c2 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Add-default-event-to-http-client-handler.patch @@ -0,0 +1,24 @@ +From a2d51a026449cf1ba2cb53996ce1cdf1f6f078d2 Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Wed, 9 Jul 2025 12:40:36 +0200 +Subject: [PATCH] fix(htt): Add default event to http-client handler + +--- + components/esp_webrtc/impl/apprtc_signal/https_client.c | 2 ++ + 1 file changed, 2 insertions(+) + +diff --git a/components/esp_webrtc/impl/apprtc_signal/https_client.c b/components/esp_webrtc/impl/apprtc_signal/https_client.c +index b7ee6fc..aca4a4e 100644 +--- a/components/esp_webrtc/impl/apprtc_signal/https_client.c ++++ b/components/esp_webrtc/impl/apprtc_signal/https_client.c +@@ -48,6 +48,8 @@ esp_err_t _http_event_handler(esp_http_client_event_t *evt) + { + http_info_t *info = evt->user_data; + switch (evt->event_id) { ++ default: ++ break; + case HTTP_EVENT_ERROR: + ESP_LOGD(TAG, "HTTP_EVENT_ERROR"); + break; +-- +2.25.1 diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch new file mode 100644 index 0000000000..37c4d797fb --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/Remove-deprecated-freeRTOS-header.patch @@ -0,0 +1,28 @@ +From cdc43a56f5ea1ab1935f55f47f8644f5dd30825e Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Thu, 10 Jul 2025 11:09:57 +0200 +Subject: [PATCH] fix(media_lib): Remove deprecated freeRTOS header + +--- + components/media_lib_sal/port/media_lib_os_freertos.c | 4 ++++ + 1 file changed, 4 insertions(+) + +diff --git a/components/media_lib_sal/port/media_lib_os_freertos.c b/components/media_lib_sal/port/media_lib_os_freertos.c +index d248d59..aea0527 100644 +--- a/components/media_lib_sal/port/media_lib_os_freertos.c ++++ b/components/media_lib_sal/port/media_lib_os_freertos.c +@@ -40,8 +40,12 @@ + #include "esp_idf_version.h" + + #if CONFIG_FREERTOS_ENABLE_TASK_SNAPSHOT ++#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 2, 0)) ++#include "esp_private/freertos_debug.h" ++#else + #include "freertos/task_snapshot.h" + #endif ++#endif + + #ifdef __XTENSA__ + #include "esp_debug_helpers.h" +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh new file mode 100755 index 0000000000..599f6ba661 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/install.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +set -e +echo "bin_dir: $1" + +bin_dir="$1" +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +ESP_PEER_VERSION="9512eef258a45aafcaaa309b1a67505c8b500363" + +ESP_PEER_URL="https://github.com/espressif/esp-webrtc-solution/archive/${ESP_PEER_VERSION}.zip" +ESP_PEER_DIR="${bin_dir}/esp-peer" +ZIP_PATH="${bin_dir}/esp-peer.zip" +EXTRACTED_DIR="${ESP_PEER_DIR}/esp-webrtc-solution-${ESP_PEER_VERSION}" +COMPONENTS_SRC="${EXTRACTED_DIR}/components" +COMPONENTS_DST="${ESP_PEER_DIR}/components" +PATCH_FILE_1="${THIS_DIR}/Add-default-event-to-http-client-handler.patch" +PATCH_FILE_2="${THIS_DIR}/Remove-deprecated-freeRTOS-header.patch" +PATCH_FILE_3="${THIS_DIR}/libpeer-Add-direct-dependency-to-libsrtp.patch" + +# Download if not exists +if [ ! -d "$EXTRACTED_DIR" ]; then + echo "Downloading esp-peer ${ESP_PEER_VERSION}..." + wget -O "$ZIP_PATH" "$ESP_PEER_URL" + unzip -o "$ZIP_PATH" -d "$ESP_PEER_DIR" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_1" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_2" + patch -p1 -d "$EXTRACTED_DIR" < "$PATCH_FILE_3" + mv ${EXTRACTED_DIR}/components ${ESP_PEER_DIR} + mv ${ESP_PEER_DIR}/components/esp_webrtc/impl/peer_default ${ESP_PEER_DIR}/components +fi diff --git a/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch new file mode 100644 index 0000000000..2275383f24 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/esp_peer_setup/libpeer-Add-direct-dependency-to-libsrtp.patch @@ -0,0 +1,23 @@ +From 695e057000698f4897b6c5802851499842e2fe31 Mon Sep 17 00:00:00 2001 +From: David Cermak +Date: Fri, 11 Jul 2025 16:59:21 +0200 +Subject: [PATCH] fix(libpeer): Add direct dependency to libsrtp + +--- + components/esp_webrtc/impl/peer_default/CMakeLists.txt | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/components/esp_webrtc/impl/peer_default/CMakeLists.txt b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +index 2af35cf..3fb4615 100644 +--- a/components/esp_webrtc/impl/peer_default/CMakeLists.txt ++++ b/components/esp_webrtc/impl/peer_default/CMakeLists.txt +@@ -2,6 +2,6 @@ idf_component_register(INCLUDE_DIRS ./include) + + get_filename_component(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME) + add_prebuilt_library(${BASE_DIR} "${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}/libpeer_default.a" +- PRIV_REQUIRES ${BASE_DIR} esp_timer) ++ PRIV_REQUIRES ${BASE_DIR} esp_timer espressif__esp_libsrtp) + target_link_libraries(${COMPONENT_LIB} INTERFACE "-L ${CMAKE_CURRENT_SOURCE_DIR}/libs/${IDF_TARGET}") + target_link_libraries(${COMPONENT_LIB} INTERFACE peer_default) +-- +2.43.0 diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt index b757b72863..52a905ee20 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -1,4 +1,14 @@ +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + set(PEER_BACKEND_SRC "peer_impl_webrtc.c") +else() + set(PEER_BACKEND_SRC "peer_impl_juice.c") +endif() + idf_component_register(SRCS "serverless_mqtt.c" "wifi_connect.c" + "${PEER_BACKEND_SRC}" INCLUDE_DIRS "." REQUIRES libjuice nvs_flash mqtt json esp_wifi) +if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER) + idf_component_optional_requires(PUBLIC media_lib_sal esp_webrtc peer_default) +endif() diff --git a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild index 7e0e97de03..0714096620 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild +++ b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild @@ -33,8 +33,40 @@ menu "Example Configuration" WiFi station password for the example to use. endmenu + choice EXAMPLE_PEER_LIB + prompt "Choose peer library" + default EXAMPLE_PEER_LIB_LIBJUICE + help + Choose the peer library to use for WebRTC communication. + libjuice: Use libjuice library for ICE/STUN/TURN (Performs manual signalling) + esp_peer: Use ESP-IDF specific peer library + + config EXAMPLE_PEER_LIB_ESP_PEER + bool "esp_peer" + + config EXAMPLE_PEER_LIB_LIBJUICE + bool "libjuice" + endchoice + + config EXAMPLE_WEBRTC_URL + string "WebRTC server URL" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "https://webrtc.espressif.com/join/" + help + URL of WebRTC remote endpoint. + + config EXAMPLE_WEBRTC_ROOM_ID + string "WebRTC room ID" + depends on EXAMPLE_PEER_LIB_ESP_PEER + default "12345" + help + Room ID for WebRTC synchronisation. + Could be a random number, but the same for both peers. + + config EXAMPLE_MQTT_BROKER_URI string "MQTT Broker URL" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "mqtt://mqtt.eclipseprojects.io" help URL of the mqtt broker use for synchronisation and exchanging @@ -42,12 +74,14 @@ menu "Example Configuration" config EXAMPLE_MQTT_SYNC_TOPIC string "MQTT topic for synchronisation" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "/topic/serverless_mqtt" help MQTT topic used fo synchronisation. config EXAMPLE_STUN_SERVER string "Hostname of STUN server" + depends on EXAMPLE_PEER_LIB_LIBJUICE default "stun.l.google.com" help STUN server hostname. @@ -67,6 +101,7 @@ menu "Example Configuration" choice EXAMPLE_SERVERLESS_ROLE prompt "Choose your role" + depends on EXAMPLE_PEER_LIB_LIBJUICE default EXAMPLE_SERVERLESS_ROLE_PEER1 help Choose either peer1 or peer2. diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h new file mode 100644 index 0000000000..e96202c4b9 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl.h @@ -0,0 +1,18 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ + +#include +#include "esp_random.h" +#include "esp_sleep.h" +#include "mosq_broker.h" + +typedef void (*on_peer_recv_t)(const char *data, size_t size); + +esp_err_t peer_init(on_peer_recv_t cb); + +void peer_get_buffer(char ** buffer, size_t *buffer_len); + +void peer_send(char* data, size_t size); diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c new file mode 100644 index 0000000000..87415df453 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_juice.c @@ -0,0 +1,281 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "mqtt_client.h" +#include "esp_wifi.h" +#include "esp_log.h" +#include "esp_check.h" +#include "juice/juice.h" +#include "cJSON.h" +#include "peer_impl.h" + +#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) +#define OUR_PEER "1" +#define THEIR_PEER "2" +#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) +#define OUR_PEER "2" +#define THEIR_PEER "1" +#endif + +#define PEER_SYNC0 BIT(0) +#define PEER_SYNC1 BIT(1) +#define PEER_SYNC2 BIT(2) +#define PEER_FAIL BIT(3) +#define PEER_GATHER_DONE BIT(4) +#define PEER_DESC_PUBLISHED BIT(5) +#define PEER_CONNECTED BIT(6) + +#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) + +#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER +#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER +#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN + +static const char *TAG = "serverless_mqtt" OUR_PEER; +static char s_buffer[MAX_BUFFER_SIZE]; +static EventGroupHandle_t s_state = NULL; +static juice_agent_t *s_agent = NULL; +static cJSON *s_peer_desc_json = NULL; +static char *s_peer_desc = NULL; +static esp_mqtt_client_handle_t s_local_mqtt = NULL; +static on_peer_recv_t s_on_recv = NULL; + +char *wifi_get_ipv4(wifi_interface_t interface); +static esp_err_t sync_peers(void); +static esp_err_t create_candidates(void); + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +void peer_send(char* data, size_t size) +{ + juice_send(s_agent, data, size); +} + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_FAIL; + ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback"); + s_on_recv = cb; + ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); + ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ESP_OK; + } +err: + ESP_LOGE(TAG, "Failed to init peer"); + return ret; +} + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { + ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); + } + xEventGroupSetBits(s_state, PEER_SYNC0); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { + break; + } + EventBits_t bits = xEventGroupGetBits(s_state); + if (event->data_len > 1 && s_agent) { + cJSON *root = cJSON_Parse(event->data); + if (root == NULL) { + break; + } + cJSON *desc = cJSON_GetObjectItem(root, "desc"); + if (desc == NULL) { + cJSON_Delete(root); + break; + } + printf("desc->valuestring:%s\n", desc->valuestring); + juice_set_remote_description(s_agent, desc->valuestring); + char cand_name[] = "cand0"; + while (true) { + cJSON *cand = cJSON_GetObjectItem(root, cand_name); + if (cand == NULL) { + break; + } + printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); + juice_add_remote_candidate(s_agent, cand->valuestring); + cand_name[4]++; + } + cJSON_Delete(root); + xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process + // and destroy the mqtt client + } +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } +#else + if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC1); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } +#endif + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static esp_err_t sync_peers(void) +{ + esp_err_t ret = ESP_OK; + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, + .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, + }; + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), + err, TAG, "Failed to register mqtt event handler"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + ESP_LOGI(TAG, "Waiting for the other peer..."); + const int max_sync_retry = 60; + int retry = 0; + while (true) { + EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); + if (bits & PEER_SYNC2) { + break; + } + if (bits & PEER_SYNC1) { + continue; + } + ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); + ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish mqtt message"); +#endif + } + ESP_LOGI(TAG, "Sync done"); + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish peer's description"); + ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); +err: + free(s_peer_desc); + esp_mqtt_client_destroy(client); + return ret; +} + +static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) +{ + ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); + if (state == JUICE_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { + esp_restart(); + } +} + +static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) +{ + static uint8_t cand_nr = 0; + if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates + char cand_name[] = "cand0"; + cand_name[4] += cand_nr++; + cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); + } +} + +static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) +{ + ESP_LOGI(TAG, "Gathering done"); + if (s_state) { + xEventGroupSetBits(s_state, PEER_GATHER_DONE); + } +} + + + +static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +{ + if (s_local_mqtt) { + s_on_recv(data, size); + } +} + +static esp_err_t create_candidates(void) +{ + ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + s_peer_desc_json = cJSON_CreateObject(); + esp_err_t ret = ESP_OK; + juice_set_log_level(JUICE_LOG_LEVEL_INFO); + juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, + .bind_address = wifi_get_ipv4(WIFI_IF_STA), + .stun_server_port = 19302, + .cb_state_changed = juice_state, + .cb_candidate = juice_candidate, + .cb_gathering_done = juice_gathering_done, + .cb_recv = juice_recv, + }; + + s_agent = juice_create(&config); + ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); + ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to get local description"); + ESP_LOGI(TAG, "desc: %s", s_buffer); + cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); + + ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to start gathering candidates"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + s_peer_desc = cJSON_Print(s_peer_desc_json); + ESP_LOGI(TAG, "desc: %s", s_peer_desc); + cJSON_Delete(s_peer_desc_json); + return ESP_OK; + +err: + juice_destroy(s_agent); + s_agent = NULL; + cJSON_Delete(s_peer_desc_json); + s_peer_desc_json = NULL; + return ret; +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c new file mode 100644 index 0000000000..b85fda7296 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/peer_impl_webrtc.c @@ -0,0 +1,227 @@ +/* + * SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include "media_lib_os.h" +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "esp_log.h" +#include "esp_webrtc_defaults.h" +#include "esp_peer_default.h" +#include "common.h" +#include "esp_check.h" +#include "peer_impl.h" +#include "sdkconfig.h" + +#define WEBRTC_URL (CONFIG_EXAMPLE_WEBRTC_URL CONFIG_EXAMPLE_WEBRTC_ROOM_ID) +#define PEER_CONNECTED BIT(0) +#define PEER_DISCONNECTED BIT(1) +#define MAX_BUFFER_SIZE (4*1024) + +static EventGroupHandle_t s_state = NULL; +static const char *TAG = "serverless_mqtt_webrtc"; + +void peer_get_buffer(char ** buffer, size_t *buffer_len) +{ + static char s_buffer[MAX_BUFFER_SIZE]; + if (buffer && buffer_len) { + *buffer = s_buffer; + *buffer_len = MAX_BUFFER_SIZE; + } +} + +static int start_webrtc(char *url); + +static on_peer_recv_t s_on_recv = NULL; + +esp_err_t peer_init(on_peer_recv_t cb) +{ + esp_err_t ret = ESP_OK; + s_on_recv = cb; + s_state = xEventGroupCreate(); + ESP_RETURN_ON_FALSE(s_state, ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + ESP_GOTO_ON_FALSE(start_webrtc(WEBRTC_URL) == ESP_PEER_ERR_NONE, ESP_FAIL, err, TAG, "Failed to start webRTC"); + + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_DISCONNECTED | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + return ret; + } +err: + vEventGroupDelete(s_state); + return ret; +} + +static int stop_webrtc(void); + +static esp_peer_signaling_handle_t signaling = NULL; +static esp_peer_handle_t peer = NULL; +static bool peer_running = false; + +static int peer_state_handler(esp_peer_state_t state, void* ctx) +{ + if (state == ESP_PEER_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == ESP_PEER_STATE_DISCONNECTED) { + xEventGroupSetBits(s_state, PEER_DISCONNECTED); + } + return 0; +} + +static int peer_msg_handler(esp_peer_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_MSG_TYPE_SDP) { + // Send local SDP to signaling server + esp_peer_signaling_send_msg(signaling, (esp_peer_signaling_msg_t *)msg); + } + return 0; +} + +static int peer_video_info_handler(esp_peer_video_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_info_handler(esp_peer_audio_stream_info_t* info, void* ctx) +{ + return 0; +} + +static int peer_audio_data_handler(esp_peer_audio_frame_t* frame, void* ctx) +{ + ESP_LOGI(TAG, "Audio Sequence %d(%d)", (int)frame->pts, (int)frame->data[0]); + return 0; +} + +static int peer_video_data_handler(esp_peer_video_frame_t* frame, void* ctx) +{ + return 0; +} + +static int peer_data_handler(esp_peer_data_frame_t* frame, void* ctx) +{ + if (frame && frame->size > 0) { + s_on_recv((char*)frame->data, frame->size); + } + return 0; +} + +static void pc_task(void *arg) +{ + while (peer_running) { + esp_peer_main_loop(peer); + media_lib_thread_sleep(20); + } + media_lib_thread_destroy(NULL); +} + +static int signaling_ice_info_handler(esp_peer_signaling_ice_info_t* info, void* ctx) +{ + if (peer == NULL) { + esp_peer_default_cfg_t peer_cfg = { + .agent_recv_timeout = 500, + }; + esp_peer_cfg_t cfg = { + .server_lists = &info->server_info, + .server_num = 1, + .audio_dir = ESP_PEER_MEDIA_DIR_SEND_RECV, + .audio_info = { + .codec = ESP_PEER_AUDIO_CODEC_G711A, + }, + .enable_data_channel = true, + .role = info->is_initiator ? ESP_PEER_ROLE_CONTROLLING : ESP_PEER_ROLE_CONTROLLED, + .on_state = peer_state_handler, + .on_msg = peer_msg_handler, + .on_video_info = peer_video_info_handler, + .on_audio_info = peer_audio_info_handler, + .on_video_data = peer_video_data_handler, + .on_audio_data = peer_audio_data_handler, + .on_data = peer_data_handler, + .ctx = ctx, + .extra_cfg = &peer_cfg, + .extra_size = sizeof(esp_peer_default_cfg_t), + }; + int ret = esp_peer_open(&cfg, esp_peer_get_default_impl(), &peer); + if (ret != ESP_PEER_ERR_NONE) { + return ret; + } + media_lib_thread_handle_t thread = NULL; + peer_running = true; + media_lib_thread_create_from_scheduler(&thread, "pc_task", pc_task, NULL); + if (thread == NULL) { + peer_running = false; + } + } + return 0; +} + +static int signaling_connected_handler(void* ctx) +{ + if (peer) { + return esp_peer_new_connection(peer); + } + return 0; +} + +static int signaling_msg_handler(esp_peer_signaling_msg_t* msg, void* ctx) +{ + if (msg->type == ESP_PEER_SIGNALING_MSG_BYE) { + esp_peer_close(peer); + peer = NULL; + } else if (msg->type == ESP_PEER_SIGNALING_MSG_SDP) { + // Receive remote SDP + if (peer) { + esp_peer_send_msg(peer, (esp_peer_msg_t*)msg); + } + } + return 0; +} + +static int signaling_close_handler(void *ctx) +{ + return 0; +} + +static int start_signaling(char* url) +{ + esp_peer_signaling_cfg_t cfg = { + .signal_url = url, + .on_ice_info = signaling_ice_info_handler, + .on_connected = signaling_connected_handler, + .on_msg = signaling_msg_handler, + .on_close = signaling_close_handler, + }; + // Use APPRTC signaling + return esp_peer_signaling_start(&cfg, esp_signaling_get_apprtc_impl(), &signaling); +} + +static int start_webrtc(char *url) +{ + stop_webrtc(); + return start_signaling(url); +} + +static int stop_webrtc(void) +{ + peer_running = false; + if (peer) { + esp_peer_close(peer); + peer = NULL; + } + if (signaling) { + esp_peer_signaling_stop(signaling); + signaling = NULL; + } + return 0; +} + +void peer_send(char* data, size_t size) +{ + esp_peer_data_frame_t data_frame = { + .type = ESP_PEER_DATA_CHANNEL_DATA, + .data = (uint8_t*)data, + .size = size, + }; + esp_peer_send_data(peer, &data_frame); +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c index 8dd0bee7c7..bd8b469c6d 100644 --- a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c +++ b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD * * SPDX-License-Identifier: Unlicense OR CC0-1.0 */ @@ -13,30 +13,9 @@ #include "esp_check.h" #include "esp_sleep.h" #include "mosq_broker.h" -#include "juice/juice.h" -#include "cJSON.h" +#include "peer_impl.h" -#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) -#define OUR_PEER "1" -#define THEIR_PEER "2" -#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) -#define OUR_PEER "2" -#define THEIR_PEER "1" -#endif - -#define PEER_SYNC0 BIT(0) -#define PEER_SYNC1 BIT(1) -#define PEER_SYNC2 BIT(2) -#define PEER_FAIL BIT(3) -#define PEER_GATHER_DONE BIT(4) -#define PEER_DESC_PUBLISHED BIT(5) -#define PEER_CONNECTED BIT(6) - -#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) - -#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER -#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER -#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN +#define ALIGN(size) (((size) + 3U) & ~(3U)) typedef struct message_wrap { uint16_t topic_len; @@ -44,196 +23,18 @@ typedef struct message_wrap { char data[]; } __attribute__((packed)) message_wrap_t; -static const char *TAG = "serverless_mqtt" OUR_PEER; -static char s_buffer[MAX_BUFFER_SIZE]; -static EventGroupHandle_t s_state = NULL; -static juice_agent_t *s_agent = NULL; -static cJSON *s_peer_desc_json = NULL; -static char *s_peer_desc = NULL; +static const char *TAG = "serverless_mqtt"; + static esp_mqtt_client_handle_t s_local_mqtt = NULL; char *wifi_get_ipv4(wifi_interface_t interface); esp_err_t wifi_connect(void); -static esp_err_t sync_peers(void); -static esp_err_t create_candidates(void); static esp_err_t create_local_client(void); static esp_err_t create_local_broker(void); -void app_main(void) -{ - __attribute__((__unused__)) esp_err_t ret; - ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); - ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); - ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); - ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); - EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); - if (bits & PEER_CONNECTED) { - ESP_LOGI(TAG, "Peer is connected!"); - ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); - ESP_LOGI(TAG, "Everything is ready, exiting main task"); - return; - } -err: - ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); - esp_deep_sleep(1000000LL * (esp_random() % 20)); -} - -static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) -{ - esp_mqtt_event_handle_t event = event_data; - esp_mqtt_client_handle_t client = event->client; - switch ((esp_mqtt_event_id_t)event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { - ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); - } - xEventGroupSetBits(s_state, PEER_SYNC0); - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { - break; - } - EventBits_t bits = xEventGroupGetBits(s_state); - if (event->data_len > 1 && s_agent) { - cJSON *root = cJSON_Parse(event->data); - if (root == NULL) { - break; - } - cJSON *desc = cJSON_GetObjectItem(root, "desc"); - if (desc == NULL) { - cJSON_Delete(root); - break; - } - printf("desc->valuestring:%s\n", desc->valuestring); - juice_set_remote_description(s_agent, desc->valuestring); - char cand_name[] = "cand0"; - while (true) { - cJSON *cand = cJSON_GetObjectItem(root, cand_name); - if (cand == NULL) { - break; - } - printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); - juice_add_remote_candidate(s_agent, cand->valuestring); - cand_name[4]++; - } - cJSON_Delete(root); - xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process - // and destroy the mqtt client - } -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } -#else - if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { - if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { - xEventGroupSetBits(s_state, PEER_SYNC1); - } else { - xEventGroupSetBits(s_state, PEER_FAIL); - } - } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { - xEventGroupSetBits(s_state, PEER_SYNC2); - } -#endif - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - xEventGroupSetBits(s_state, PEER_FAIL); - break; - default: - ESP_LOGI(TAG, "Other event id:%d", event->event_id); - break; - } -} +esp_err_t peer_init(on_peer_recv_t cb); -static esp_err_t sync_peers(void) -{ - esp_err_t ret = ESP_OK; - esp_mqtt_client_config_t mqtt_cfg = { - .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, - .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, - }; - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), - err, TAG, "Failed to register mqtt event handler"); - ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - ESP_LOGI(TAG, "Waiting for the other peer..."); - const int max_sync_retry = 60; - int retry = 0; - while (true) { - EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); - if (bits & PEER_SYNC2) { - break; - } - if (bits & PEER_SYNC1) { - continue; - } - ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); - ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); -#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish mqtt message"); -#endif - } - ESP_LOGI(TAG, "Sync done"); - ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, - ESP_FAIL, TAG, "Failed to publish peer's description"); - ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), - ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); -err: - free(s_peer_desc); - esp_mqtt_client_destroy(client); - return ret; -} - -static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) -{ - ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); - if (state == JUICE_STATE_CONNECTED) { - xEventGroupSetBits(s_state, PEER_CONNECTED); - } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { - esp_restart(); - } -} - -static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) -{ - static uint8_t cand_nr = 0; - if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates - char cand_name[] = "cand0"; - cand_name[4] += cand_nr++; - cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); - } -} - -static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) -{ - ESP_LOGI(TAG, "Gathering done"); - if (s_state) { - xEventGroupSetBits(s_state, PEER_GATHER_DONE); - } -} - -#define ALIGN(size) (((size) + 3U) & ~(3U)) - -static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +static void peer_recv(const char *data, size_t size) { if (s_local_mqtt) { message_wrap_t *message = (message_wrap_t *)data; @@ -250,45 +51,21 @@ static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload); esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0); } + } -static esp_err_t create_candidates(void) +void app_main(void) { - ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); - s_peer_desc_json = cJSON_CreateObject(); - esp_err_t ret = ESP_OK; - juice_set_log_level(JUICE_LOG_LEVEL_INFO); - juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, - .bind_address = wifi_get_ipv4(WIFI_IF_STA), - .stun_server_port = 19302, - .cb_state_changed = juice_state, - .cb_candidate = juice_candidate, - .cb_gathering_done = juice_gathering_done, - .cb_recv = juice_recv, - }; - - s_agent = juice_create(&config); - ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); - ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to get local description"); - ESP_LOGI(TAG, "desc: %s", s_buffer); - cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); - - ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, - ESP_FAIL, err, TAG, "Failed to start gathering candidates"); - ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), - ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); - s_peer_desc = cJSON_Print(s_peer_desc_json); - ESP_LOGI(TAG, "desc: %s", s_peer_desc); - cJSON_Delete(s_peer_desc_json); - return ESP_OK; - + __attribute__((__unused__)) esp_err_t ret; + ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); + ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); + ESP_GOTO_ON_ERROR(peer_init(peer_recv), err, TAG, "Failed to init peer library"); + ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); + ESP_LOGI(TAG, "Everything is ready, exiting main task"); + return; err: - juice_destroy(s_agent); - s_agent = NULL; - cJSON_Delete(s_peer_desc_json); - s_peer_desc_json = NULL; - return ret; + ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); + esp_deep_sleep(1000000LL * (esp_random() % 20)); } static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data) @@ -335,18 +112,26 @@ static esp_err_t create_local_client(void) static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain) { - if (client && strcmp(client, "local_mqtt") == 0 ) { + if (client && strcmp(client, "local_mqtt") == 0) { // This is our little local client -- do not forward return; } ESP_LOGI(TAG, "handle_message topic:%s", topic); ESP_LOGI(TAG, "handle_message data:%.*s", len, payload); ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain); - if (s_local_mqtt && s_agent) { + if (s_local_mqtt) { + static char *s_buffer = NULL; + static size_t s_buffer_len = 0; + if (s_buffer == NULL || s_buffer_len == 0) { + peer_get_buffer(&s_buffer, &s_buffer_len); + if (s_buffer == NULL || s_buffer_len == 0) { + return; + } + } int topic_len = strlen(topic) + 1; // null term int topic_len_aligned = ALIGN(topic_len); int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len; - if (total_msg_len > MAX_BUFFER_SIZE) { + if (total_msg_len > s_buffer_len) { ESP_LOGE(TAG, "Fail to forward, message too long"); return; } @@ -356,7 +141,7 @@ static void handle_message(char *client, char *topic, char *payload, int len, in memcpy(s_buffer + 4, topic, topic_len); memcpy(s_buffer + 4 + topic_len_aligned, payload, len); - juice_send(s_agent, s_buffer, total_msg_len); + peer_send(s_buffer, total_msg_len); } } diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default new file mode 100644 index 0000000000..4c23f9c136 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.default @@ -0,0 +1,2 @@ +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=n +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=y diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer new file mode 100644 index 0000000000..a949251a35 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.ci.esp_peer @@ -0,0 +1,3 @@ +CONFIG_IDF_TARGET="esp32" +CONFIG_EXAMPLE_PEER_LIB_ESP_PEER=y +CONFIG_EXAMPLE_PEER_LIB_LIBJUICE=n diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults index 037b680153..f0fd9cde6d 100644 --- a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults @@ -1,3 +1,6 @@ CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384 CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768 +CONFIG_LWIP_SNTP_MAX_SERVERS=2 +CONFIG_MBEDTLS_SSL_PROTO_DTLS=y +CONFIG_MBEDTLS_SSL_DTLS_SRTP=y