From 954ad42f268635c7cfd1bd64c9629fc073ced36d Mon Sep 17 00:00:00 2001 From: JacobBarthelmeh Date: Tue, 20 May 2025 09:57:01 -0600 Subject: [PATCH 1/4] add port for NETX --- examples/mqttexample.c | 2 +- examples/mqttexample.h | 3 +- examples/mqttnet.c | 182 +++++++++++++++++++++++++++++++++++++++++ examples/mqttnet.h | 5 ++ examples/mqttport.h | 7 ++ 5 files changed, 197 insertions(+), 2 deletions(-) diff --git a/examples/mqttexample.c b/examples/mqttexample.c index e37af3f9..3d9539ab 100644 --- a/examples/mqttexample.c +++ b/examples/mqttexample.c @@ -584,7 +584,7 @@ int mqtt_check_timeout(int rc, word32* start_sec, word32 timeout_sec) #endif /* WOLFMQTT_NONBLOCK */ -#ifdef ENABLE_MQTT_TLS +#if defined(ENABLE_MQTT_TLS) && !defined(EXTERNAL_MQTT_TLS_CALLBACK) #ifdef WOLFSSL_ENCRYPTED_KEYS int mqtt_password_cb(char* passwd, int sz, int rw, void* userdata) diff --git a/examples/mqttexample.h b/examples/mqttexample.h index d6d945ea..92f87ee3 100644 --- a/examples/mqttexample.h +++ b/examples/mqttexample.h @@ -98,6 +98,7 @@ #endif /* certs are either static or extern, depending on the specific example */ +#ifndef EXTERNAL_MQTT_TLS_CALLBACK #ifdef WOLFMQTT_EXTERN_CERT #undef WOLFMQTT_EXAMPLE_CERT #define WOLFMQTT_EXAMPLE_CERT /* init extern from mqttexample.h */ @@ -108,7 +109,7 @@ #undef WOLFMQTT_EXAMPLE_CERT #define WOLFMQTT_EXAMPLE_CERT static #endif - +#endif /* !EXTERNAL_MQTT_TLS_CALLBACK */ /* MQTT Client state */ typedef enum _MQTTCtxState { WMQ_BEGIN = 0, diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 347ac67d..a13e8d31 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -378,6 +378,186 @@ static int NetRead(void *context, byte* buf, int buf_len, } +/* -------------------------------------------------------------------------- */ +/* NETX SOCKET BACKEND EXAMPLE */ +/* -------------------------------------------------------------------------- */ +#elif defined(HAVE_NETX) + +static int NetDisconnect(void *context) +{ + SocketContext *sock = (SocketContext*)context; + if (sock) { + nx_tcp_socket_disconnect(&sock->fd, NX_NO_WAIT); + nx_tcp_socket_delete(&sock->fd); + + sock->stat = SOCK_BEGIN; + } + return 0; +} + +static int NetConnect(void *context, const char* host, word16 port, + int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + int rc = MQTT_CODE_ERROR_NETWORK; + MQTTCtx* mqttCtx = sock->mqttCtx; + UINT status; + NXD_ADDRESS ipAddress; + + /* Get address information for host and locate IPv4 */ + switch(sock->stat) { + case SOCK_BEGIN: + { + if (mqttCtx->debug_on) { + PRINTF("NetConnect: Host %s, Port %u, Timeout %d ms, " + "Use TLS %d", host, port, timeout_ms, mqttCtx->use_tls); + } + #ifdef HAVE_NETX_DNS + /* Convert hostname to IP address using NETX DUO DNS */ + status = nxd_dns_host_by_name_get(sock->ipPtr, (UCHAR *)host, &ipAddress, timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("DNS lookup failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + #else + PRINTF("DNS lookup not avilable"); + return MQTT_CODE_ERROR_NETWORK; + #endif + status = nx_tcp_socket_create(sock->ipPtr, &sock->fd, + "MQTT Socket", NX_IP_NORMAL, + NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, + 1024, NX_NULL, NX_NULL); + if (status != NX_SUCCESS) { + PRINTF("Socket create failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + + /* Bind the socket to a local port */ + status = nx_tcp_client_socket_bind(&sock->fd, port, NX_WAIT_FOREVER); + if (status != NX_SUCCESS) { + PRINTF("Socket bind failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + + sock->stat = SOCK_CONN; + } + FALL_THROUGH; + + case SOCK_CONN: + { + /* Connect to server using NETX DUO */ + status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port, timeout_ms); + if (status != NX_SUCCESS) { + if (status == NX_WAIT_ABORTED) { + return MQTT_CODE_CONTINUE; + } + PRINTF("Socket connect failed: %d", status); + NetDisconnect(context); + return MQTT_CODE_ERROR_NETWORK; + } + return MQTT_CODE_SUCCESS; + } + + default: + rc = MQTT_CODE_ERROR_BAD_ARG; + break; + } /* switch */ + + return rc; +} + + +static int NetWrite(void *context, const byte* buf, int buf_len, + int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + NX_PACKET* packet; + NX_PACKET_POOL* pool; /* shorthand */ + UINT status; + + if (sock == NULL) { + PRINTF("NetX Send NULL parameters"); + return MQTT_CODE_ERROR_BAD_ARG; + } + + pool = sock->fd.nx_tcp_socket_ip_ptr->nx_ip_default_packet_pool; + status = nx_packet_allocate(pool, &packet, NX_TCP_PACKET, + timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("NetX Send packet alloc error"); + return MQTT_CODE_ERROR_NETWORK; + } + + status = nx_packet_data_append(packet, (VOID*)buf, buf_len, pool, timeout_ms); + if (status != NX_SUCCESS) { + nx_packet_release(packet); + PRINTF("NetX Send data append error"); + return MQTT_CODE_ERROR_NETWORK; + } + + status = nx_tcp_socket_send(&sock->fd, packet, timeout_ms); + if (status != NX_SUCCESS) { + nx_packet_release(packet); + PRINTF("NetX Send socket send error"); + return MQTT_CODE_ERROR_NETWORK; + } + + return buf_len; +} + + +static int NetRead(void *context, byte* buf, int buf_len, + int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + ULONG left; + ULONG total; + ULONG copied = 0; + UINT status; + + if (sock == NULL) { + PRINTF("NetX Recv NULL parameters"); + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (sock->nxPacket == NULL) { + status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, + timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("NetX Recv receive error"); + return MQTT_CODE_ERROR_NETWORK; + } + } + + if (sock->nxPacket) { + status = nx_packet_length_get(sock->nxPacket, &total); + if (status != NX_SUCCESS) { + PRINTF("NetX Recv length get error"); + return MQTT_CODE_ERROR_NETWORK; + } + + left = total - sock->nxOffset; + status = nx_packet_data_extract_offset(sock->nxPacket, sock->nxOffset, + buf, buf_len, &copied); + if (status != NX_SUCCESS) { + PRINTF("NetX Recv data extract offset error"); + return MQTT_CODE_ERROR_NETWORK; + } + + sock->nxOffset += copied; + + if (copied == left) { + PRINTF("NetX Recv Drained packet"); + nx_packet_release(sock->nxPacket); + sock->nxPacket = NULL; + sock->nxOffset = 0; + } + } + + return copied; +} + + /* -------------------------------------------------------------------------- */ /* CURL EASY SOCKET BACKEND EXAMPLE */ /* -------------------------------------------------------------------------- */ @@ -1606,7 +1786,9 @@ int MqttClientNet_Init(MqttNet* net, MQTTCtx* mqttCtx) #if defined(ENABLE_MQTT_CURL) sockCtx->curl = NULL; #endif +#if !defined(HAVE_NETX) sockCtx->fd = SOCKET_INVALID; +#endif sockCtx->stat = SOCK_BEGIN; sockCtx->mqttCtx = mqttCtx; diff --git a/examples/mqttnet.h b/examples/mqttnet.h index b50bb6e2..f87d58f8 100644 --- a/examples/mqttnet.h +++ b/examples/mqttnet.h @@ -55,6 +55,11 @@ typedef struct _SocketContext { #endif #ifdef ENABLE_MQTT_WEBSOCKET void* websocket_ctx; +#endif +#ifdef HAVE_NETX + NX_IP *ipPtr; + NX_PACKET *nxPacket; + ULONG nxOffset; #endif MQTTCtx* mqttCtx; } SocketContext; diff --git a/examples/mqttport.h b/examples/mqttport.h index 5dd65944..6395cfa5 100644 --- a/examples/mqttport.h +++ b/examples/mqttport.h @@ -77,6 +77,13 @@ extern "C" { #elif defined(WOLFMQTT_USER_IO) #include "userio_template.h" +/* NetX */ +#elif defined(HAVE_NETX) + #include "nx_api.h" + + #define SOCKET_T NX_TCP_SOCKET + #define SOCK_ADDR_IN NXD_ADDRESS + /* Windows */ #elif defined(USE_WINDOWS_API) #include From 53d4fb972b0c329d803c11e0512e2653e37e53ee Mon Sep 17 00:00:00 2001 From: JacobBarthelmeh Date: Tue, 20 May 2025 10:44:16 -0600 Subject: [PATCH 2/4] formatting, spelling fix, rename DNS support macro guard --- examples/mqttnet.c | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/examples/mqttnet.c b/examples/mqttnet.c index a13e8d31..c0c7926f 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -412,28 +412,29 @@ static int NetConnect(void *context, const char* host, word16 port, PRINTF("NetConnect: Host %s, Port %u, Timeout %d ms, " "Use TLS %d", host, port, timeout_ms, mqttCtx->use_tls); } - #ifdef HAVE_NETX_DNS + #ifndef WOLFMQTT_NO_NETX_DNS /* Convert hostname to IP address using NETX DUO DNS */ - status = nxd_dns_host_by_name_get(sock->ipPtr, (UCHAR *)host, &ipAddress, timeout_ms); + status = nxd_dns_host_by_name_get(sock->ipPtr, (UCHAR *)host, + &ipAddress, timeout_ms); if (status != NX_SUCCESS) { PRINTF("DNS lookup failed: %d", status); return MQTT_CODE_ERROR_NETWORK; } #else - PRINTF("DNS lookup not avilable"); - return MQTT_CODE_ERROR_NETWORK; + PRINTF("DNS lookup not available"); + return MQTT_CODE_ERROR_NETWORK; #endif - status = nx_tcp_socket_create(sock->ipPtr, &sock->fd, - "MQTT Socket", NX_IP_NORMAL, - NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, - 1024, NX_NULL, NX_NULL); + status = nx_tcp_socket_create(sock->ipPtr, &sock->fd, + "MQTT Socket", NX_IP_NORMAL, + NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, + 1024, NX_NULL, NX_NULL); if (status != NX_SUCCESS) { PRINTF("Socket create failed: %d", status); return MQTT_CODE_ERROR_NETWORK; } /* Bind the socket to a local port */ - status = nx_tcp_client_socket_bind(&sock->fd, port, NX_WAIT_FOREVER); + status = nx_tcp_client_socket_bind(&sock->fd, port, timeout_ms); if (status != NX_SUCCESS) { PRINTF("Socket bind failed: %d", status); return MQTT_CODE_ERROR_NETWORK; @@ -446,11 +447,9 @@ static int NetConnect(void *context, const char* host, word16 port, case SOCK_CONN: { /* Connect to server using NETX DUO */ - status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port, timeout_ms); + status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port, + timeout_ms); if (status != NX_SUCCESS) { - if (status == NX_WAIT_ABORTED) { - return MQTT_CODE_CONTINUE; - } PRINTF("Socket connect failed: %d", status); NetDisconnect(context); return MQTT_CODE_ERROR_NETWORK; @@ -467,8 +466,7 @@ static int NetConnect(void *context, const char* host, word16 port, } -static int NetWrite(void *context, const byte* buf, int buf_len, - int timeout_ms) +static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) { SocketContext *sock = (SocketContext*)context; NX_PACKET* packet; @@ -481,14 +479,14 @@ static int NetWrite(void *context, const byte* buf, int buf_len, } pool = sock->fd.nx_tcp_socket_ip_ptr->nx_ip_default_packet_pool; - status = nx_packet_allocate(pool, &packet, NX_TCP_PACKET, - timeout_ms); + status = nx_packet_allocate(pool, &packet, NX_TCP_PACKET, timeout_ms); if (status != NX_SUCCESS) { PRINTF("NetX Send packet alloc error"); return MQTT_CODE_ERROR_NETWORK; } - status = nx_packet_data_append(packet, (VOID*)buf, buf_len, pool, timeout_ms); + status = nx_packet_data_append(packet, (VOID*)buf, buf_len, pool, + timeout_ms); if (status != NX_SUCCESS) { nx_packet_release(packet); PRINTF("NetX Send data append error"); @@ -506,8 +504,7 @@ static int NetWrite(void *context, const byte* buf, int buf_len, } -static int NetRead(void *context, byte* buf, int buf_len, - int timeout_ms) +static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) { SocketContext *sock = (SocketContext*)context; ULONG left; @@ -521,8 +518,7 @@ static int NetRead(void *context, byte* buf, int buf_len, } if (sock->nxPacket == NULL) { - status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, - timeout_ms); + status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, timeout_ms); if (status != NX_SUCCESS) { PRINTF("NetX Recv receive error"); return MQTT_CODE_ERROR_NETWORK; @@ -537,8 +533,8 @@ static int NetRead(void *context, byte* buf, int buf_len, } left = total - sock->nxOffset; - status = nx_packet_data_extract_offset(sock->nxPacket, sock->nxOffset, - buf, buf_len, &copied); + status = nx_packet_data_extract_offset(sock->nxPacket, + sock->nxOffset, buf, buf_len, &copied); if (status != NX_SUCCESS) { PRINTF("NetX Recv data extract offset error"); return MQTT_CODE_ERROR_NETWORK; From 1a1680eb3df8c9965bebe66f1e4734fd4d126b5f Mon Sep 17 00:00:00 2001 From: JacobBarthelmeh Date: Wed, 21 May 2025 17:48:13 -0600 Subject: [PATCH 3/4] add THREADX semaphore support for multithreaded cases --- src/mqtt_client.c | 29 +++++++++++++++++++++++++++++ wolfmqtt/mqtt_types.h | 4 ++++ 2 files changed, 33 insertions(+) diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 26479479..7de2e1c0 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -186,6 +186,35 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); return 0; } +#elif defined(THREADX) + /* ThreadX semaphore */ + int wm_SemInit(wm_Sem *s) { + if (tx_semaphore_create(s, NULL, 1) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + int wm_SemFree(wm_Sem *s) { + if (tx_semaphore_delete(s) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + + static UINT semstatus; + int wm_SemLock(wm_Sem *s) { + semstatus = tx_semaphore_get(s, TX_WAIT_FOREVER); + if (semstatus != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + int wm_SemUnlock(wm_Sem *s) { + if (tx_semaphore_put(s) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } #endif /* MUTEX */ #endif /* WOLFMQTT_MULTITHREAD */ diff --git a/wolfmqtt/mqtt_types.h b/wolfmqtt/mqtt_types.h index 2c820c99..fa166e58 100644 --- a/wolfmqtt/mqtt_types.h +++ b/wolfmqtt/mqtt_types.h @@ -126,6 +126,10 @@ #include typedef HANDLE wm_Sem; + #elif defined(THREADX) + #include + typedef TX_SEMAPHORE wm_Sem; + #else #error "Multithreading requires binary semaphore implementation!" #endif From 299d07e0ba9d1f2e65f581ae56f52e47f39ccbe9 Mon Sep 17 00:00:00 2001 From: JacobBarthelmeh Date: Wed, 28 May 2025 10:07:16 -0600 Subject: [PATCH 4/4] fix for handling socket timeout with NETX --- examples/mqttnet.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/examples/mqttnet.c b/examples/mqttnet.c index c0c7926f..77b11eb2 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -403,6 +403,7 @@ static int NetConnect(void *context, const char* host, word16 port, MQTTCtx* mqttCtx = sock->mqttCtx; UINT status; NXD_ADDRESS ipAddress; + UINT ticks; /* Get address information for host and locate IPv4 */ switch(sock->stat) { @@ -446,9 +447,12 @@ static int NetConnect(void *context, const char* host, word16 port, case SOCK_CONN: { + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + /* Connect to server using NETX DUO */ status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port, - timeout_ms); + ticks); if (status != NX_SUCCESS) { PRINTF("Socket connect failed: %d", status); NetDisconnect(context); @@ -472,6 +476,7 @@ static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) NX_PACKET* packet; NX_PACKET_POOL* pool; /* shorthand */ UINT status; + UINT ticks; if (sock == NULL) { PRINTF("NetX Send NULL parameters"); @@ -493,7 +498,11 @@ static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) return MQTT_CODE_ERROR_NETWORK; } - status = nx_tcp_socket_send(&sock->fd, packet, timeout_ms); + + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + + status = nx_tcp_socket_send(&sock->fd, packet, ticks); if (status != NX_SUCCESS) { nx_packet_release(packet); PRINTF("NetX Send socket send error"); @@ -511,6 +520,7 @@ static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) ULONG total; ULONG copied = 0; UINT status; + UINT ticks; if (sock == NULL) { PRINTF("NetX Recv NULL parameters"); @@ -518,8 +528,14 @@ static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) } if (sock->nxPacket == NULL) { - status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, timeout_ms); + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, ticks); if (status != NX_SUCCESS) { + if (status == NX_NO_PACKET) { + PRINTF("NetX Recv timeout"); + return MQTT_CODE_ERROR_TIMEOUT; + } PRINTF("NetX Recv receive error"); return MQTT_CODE_ERROR_NETWORK; }