diff --git a/examples/mqttexample.c b/examples/mqttexample.c index e37af3f9..3d9539ab 100644 --- a/examples/mqttexample.c +++ b/examples/mqttexample.c @@ -584,7 +584,7 @@ int mqtt_check_timeout(int rc, word32* start_sec, word32 timeout_sec) #endif /* WOLFMQTT_NONBLOCK */ -#ifdef ENABLE_MQTT_TLS +#if defined(ENABLE_MQTT_TLS) && !defined(EXTERNAL_MQTT_TLS_CALLBACK) #ifdef WOLFSSL_ENCRYPTED_KEYS int mqtt_password_cb(char* passwd, int sz, int rw, void* userdata) diff --git a/examples/mqttexample.h b/examples/mqttexample.h index d6d945ea..92f87ee3 100644 --- a/examples/mqttexample.h +++ b/examples/mqttexample.h @@ -98,6 +98,7 @@ #endif /* certs are either static or extern, depending on the specific example */ +#ifndef EXTERNAL_MQTT_TLS_CALLBACK #ifdef WOLFMQTT_EXTERN_CERT #undef WOLFMQTT_EXAMPLE_CERT #define WOLFMQTT_EXAMPLE_CERT /* init extern from mqttexample.h */ @@ -108,7 +109,7 @@ #undef WOLFMQTT_EXAMPLE_CERT #define WOLFMQTT_EXAMPLE_CERT static #endif - +#endif /* !EXTERNAL_MQTT_TLS_CALLBACK */ /* MQTT Client state */ typedef enum _MQTTCtxState { WMQ_BEGIN = 0, diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 347ac67d..77b11eb2 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -378,6 +378,198 @@ static int NetRead(void *context, byte* buf, int buf_len, } +/* -------------------------------------------------------------------------- */ +/* NETX SOCKET BACKEND EXAMPLE */ +/* -------------------------------------------------------------------------- */ +#elif defined(HAVE_NETX) + +static int NetDisconnect(void *context) +{ + SocketContext *sock = (SocketContext*)context; + if (sock) { + nx_tcp_socket_disconnect(&sock->fd, NX_NO_WAIT); + nx_tcp_socket_delete(&sock->fd); + + sock->stat = SOCK_BEGIN; + } + return 0; +} + +static int NetConnect(void *context, const char* host, word16 port, + int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + int rc = MQTT_CODE_ERROR_NETWORK; + MQTTCtx* mqttCtx = sock->mqttCtx; + UINT status; + NXD_ADDRESS ipAddress; + UINT ticks; + + /* Get address information for host and locate IPv4 */ + switch(sock->stat) { + case SOCK_BEGIN: + { + if (mqttCtx->debug_on) { + PRINTF("NetConnect: Host %s, Port %u, Timeout %d ms, " + "Use TLS %d", host, port, timeout_ms, mqttCtx->use_tls); + } + #ifndef WOLFMQTT_NO_NETX_DNS + /* Convert hostname to IP address using NETX DUO DNS */ + status = nxd_dns_host_by_name_get(sock->ipPtr, (UCHAR *)host, + &ipAddress, timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("DNS lookup failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + #else + PRINTF("DNS lookup not available"); + return MQTT_CODE_ERROR_NETWORK; + #endif + status = nx_tcp_socket_create(sock->ipPtr, &sock->fd, + "MQTT Socket", NX_IP_NORMAL, + NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, + 1024, NX_NULL, NX_NULL); + if (status != NX_SUCCESS) { + PRINTF("Socket create failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + + /* Bind the socket to a local port */ + status = nx_tcp_client_socket_bind(&sock->fd, port, timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("Socket bind failed: %d", status); + return MQTT_CODE_ERROR_NETWORK; + } + + sock->stat = SOCK_CONN; + } + FALL_THROUGH; + + case SOCK_CONN: + { + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + + /* Connect to server using NETX DUO */ + status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port, + ticks); + if (status != NX_SUCCESS) { + PRINTF("Socket connect failed: %d", status); + NetDisconnect(context); + return MQTT_CODE_ERROR_NETWORK; + } + return MQTT_CODE_SUCCESS; + } + + default: + rc = MQTT_CODE_ERROR_BAD_ARG; + break; + } /* switch */ + + return rc; +} + + +static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + NX_PACKET* packet; + NX_PACKET_POOL* pool; /* shorthand */ + UINT status; + UINT ticks; + + if (sock == NULL) { + PRINTF("NetX Send NULL parameters"); + return MQTT_CODE_ERROR_BAD_ARG; + } + + pool = sock->fd.nx_tcp_socket_ip_ptr->nx_ip_default_packet_pool; + status = nx_packet_allocate(pool, &packet, NX_TCP_PACKET, timeout_ms); + if (status != NX_SUCCESS) { + PRINTF("NetX Send packet alloc error"); + return MQTT_CODE_ERROR_NETWORK; + } + + status = nx_packet_data_append(packet, (VOID*)buf, buf_len, pool, + timeout_ms); + if (status != NX_SUCCESS) { + nx_packet_release(packet); + PRINTF("NetX Send data append error"); + return MQTT_CODE_ERROR_NETWORK; + } + + + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + + status = nx_tcp_socket_send(&sock->fd, packet, ticks); + if (status != NX_SUCCESS) { + nx_packet_release(packet); + PRINTF("NetX Send socket send error"); + return MQTT_CODE_ERROR_NETWORK; + } + + return buf_len; +} + + +static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + ULONG left; + ULONG total; + ULONG copied = 0; + UINT status; + UINT ticks; + + if (sock == NULL) { + PRINTF("NetX Recv NULL parameters"); + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (sock->nxPacket == NULL) { + /* Convert timeout_ms to ticks and round up */ + ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000; + status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, ticks); + if (status != NX_SUCCESS) { + if (status == NX_NO_PACKET) { + PRINTF("NetX Recv timeout"); + return MQTT_CODE_ERROR_TIMEOUT; + } + PRINTF("NetX Recv receive error"); + return MQTT_CODE_ERROR_NETWORK; + } + } + + if (sock->nxPacket) { + status = nx_packet_length_get(sock->nxPacket, &total); + if (status != NX_SUCCESS) { + PRINTF("NetX Recv length get error"); + return MQTT_CODE_ERROR_NETWORK; + } + + left = total - sock->nxOffset; + status = nx_packet_data_extract_offset(sock->nxPacket, + sock->nxOffset, buf, buf_len, &copied); + if (status != NX_SUCCESS) { + PRINTF("NetX Recv data extract offset error"); + return MQTT_CODE_ERROR_NETWORK; + } + + sock->nxOffset += copied; + + if (copied == left) { + PRINTF("NetX Recv Drained packet"); + nx_packet_release(sock->nxPacket); + sock->nxPacket = NULL; + sock->nxOffset = 0; + } + } + + return copied; +} + + /* -------------------------------------------------------------------------- */ /* CURL EASY SOCKET BACKEND EXAMPLE */ /* -------------------------------------------------------------------------- */ @@ -1606,7 +1798,9 @@ int MqttClientNet_Init(MqttNet* net, MQTTCtx* mqttCtx) #if defined(ENABLE_MQTT_CURL) sockCtx->curl = NULL; #endif +#if !defined(HAVE_NETX) sockCtx->fd = SOCKET_INVALID; +#endif sockCtx->stat = SOCK_BEGIN; sockCtx->mqttCtx = mqttCtx; diff --git a/examples/mqttnet.h b/examples/mqttnet.h index b50bb6e2..f87d58f8 100644 --- a/examples/mqttnet.h +++ b/examples/mqttnet.h @@ -55,6 +55,11 @@ typedef struct _SocketContext { #endif #ifdef ENABLE_MQTT_WEBSOCKET void* websocket_ctx; +#endif +#ifdef HAVE_NETX + NX_IP *ipPtr; + NX_PACKET *nxPacket; + ULONG nxOffset; #endif MQTTCtx* mqttCtx; } SocketContext; diff --git a/examples/mqttport.h b/examples/mqttport.h index 5dd65944..6395cfa5 100644 --- a/examples/mqttport.h +++ b/examples/mqttport.h @@ -77,6 +77,13 @@ extern "C" { #elif defined(WOLFMQTT_USER_IO) #include "userio_template.h" +/* NetX */ +#elif defined(HAVE_NETX) + #include "nx_api.h" + + #define SOCKET_T NX_TCP_SOCKET + #define SOCK_ADDR_IN NXD_ADDRESS + /* Windows */ #elif defined(USE_WINDOWS_API) #include diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 26479479..7de2e1c0 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -186,6 +186,35 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); return 0; } +#elif defined(THREADX) + /* ThreadX semaphore */ + int wm_SemInit(wm_Sem *s) { + if (tx_semaphore_create(s, NULL, 1) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + int wm_SemFree(wm_Sem *s) { + if (tx_semaphore_delete(s) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + + static UINT semstatus; + int wm_SemLock(wm_Sem *s) { + semstatus = tx_semaphore_get(s, TX_WAIT_FOREVER); + if (semstatus != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } + int wm_SemUnlock(wm_Sem *s) { + if (tx_semaphore_put(s) != TX_SUCCESS) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM); + } + return 0; + } #endif /* MUTEX */ #endif /* WOLFMQTT_MULTITHREAD */ diff --git a/wolfmqtt/mqtt_types.h b/wolfmqtt/mqtt_types.h index 2c820c99..fa166e58 100644 --- a/wolfmqtt/mqtt_types.h +++ b/wolfmqtt/mqtt_types.h @@ -126,6 +126,10 @@ #include typedef HANDLE wm_Sem; + #elif defined(THREADX) + #include + typedef TX_SEMAPHORE wm_Sem; + #else #error "Multithreading requires binary semaphore implementation!" #endif