Skip to content

Add port for NETX use #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/mqttexample.c
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ int mqtt_check_timeout(int rc, word32* start_sec, word32 timeout_sec)
#endif /* WOLFMQTT_NONBLOCK */


#ifdef ENABLE_MQTT_TLS
#if defined(ENABLE_MQTT_TLS) && !defined(EXTERNAL_MQTT_TLS_CALLBACK)

#ifdef WOLFSSL_ENCRYPTED_KEYS
int mqtt_password_cb(char* passwd, int sz, int rw, void* userdata)
Expand Down
3 changes: 2 additions & 1 deletion examples/mqttexample.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
#endif

/* certs are either static or extern, depending on the specific example */
#ifndef EXTERNAL_MQTT_TLS_CALLBACK
#ifdef WOLFMQTT_EXTERN_CERT
#undef WOLFMQTT_EXAMPLE_CERT
#define WOLFMQTT_EXAMPLE_CERT /* init extern from mqttexample.h */
Expand All @@ -108,7 +109,7 @@
#undef WOLFMQTT_EXAMPLE_CERT
#define WOLFMQTT_EXAMPLE_CERT static
#endif

#endif /* !EXTERNAL_MQTT_TLS_CALLBACK */
/* MQTT Client state */
typedef enum _MQTTCtxState {
WMQ_BEGIN = 0,
Expand Down
194 changes: 194 additions & 0 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,198 @@ static int NetRead(void *context, byte* buf, int buf_len,
}


/* -------------------------------------------------------------------------- */
/* NETX SOCKET BACKEND EXAMPLE */
/* -------------------------------------------------------------------------- */
#elif defined(HAVE_NETX)

static int NetDisconnect(void *context)
{
SocketContext *sock = (SocketContext*)context;
if (sock) {
nx_tcp_socket_disconnect(&sock->fd, NX_NO_WAIT);
nx_tcp_socket_delete(&sock->fd);

sock->stat = SOCK_BEGIN;
}
return 0;
}

static int NetConnect(void *context, const char* host, word16 port,
int timeout_ms)
{
SocketContext *sock = (SocketContext*)context;
int rc = MQTT_CODE_ERROR_NETWORK;
MQTTCtx* mqttCtx = sock->mqttCtx;
UINT status;
NXD_ADDRESS ipAddress;
UINT ticks;

/* Get address information for host and locate IPv4 */
switch(sock->stat) {
case SOCK_BEGIN:
{
if (mqttCtx->debug_on) {
PRINTF("NetConnect: Host %s, Port %u, Timeout %d ms, "
"Use TLS %d", host, port, timeout_ms, mqttCtx->use_tls);
}
#ifndef WOLFMQTT_NO_NETX_DNS
/* Convert hostname to IP address using NETX DUO DNS */
status = nxd_dns_host_by_name_get(sock->ipPtr, (UCHAR *)host,
&ipAddress, timeout_ms);
if (status != NX_SUCCESS) {
PRINTF("DNS lookup failed: %d", status);
return MQTT_CODE_ERROR_NETWORK;
}
#else
PRINTF("DNS lookup not available");
return MQTT_CODE_ERROR_NETWORK;
#endif
status = nx_tcp_socket_create(sock->ipPtr, &sock->fd,
"MQTT Socket", NX_IP_NORMAL,
NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE,
1024, NX_NULL, NX_NULL);
if (status != NX_SUCCESS) {
PRINTF("Socket create failed: %d", status);
return MQTT_CODE_ERROR_NETWORK;
}

/* Bind the socket to a local port */
status = nx_tcp_client_socket_bind(&sock->fd, port, timeout_ms);
if (status != NX_SUCCESS) {
PRINTF("Socket bind failed: %d", status);
return MQTT_CODE_ERROR_NETWORK;
}

sock->stat = SOCK_CONN;
}
FALL_THROUGH;

case SOCK_CONN:
{
/* Convert timeout_ms to ticks and round up */
ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000;

/* Connect to server using NETX DUO */
status = nxd_tcp_client_socket_connect(&sock->fd, &ipAddress, port,
ticks);
if (status != NX_SUCCESS) {
PRINTF("Socket connect failed: %d", status);
NetDisconnect(context);
return MQTT_CODE_ERROR_NETWORK;
}
return MQTT_CODE_SUCCESS;
}

default:
rc = MQTT_CODE_ERROR_BAD_ARG;
break;
} /* switch */

return rc;
}


static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms)
{
SocketContext *sock = (SocketContext*)context;
NX_PACKET* packet;
NX_PACKET_POOL* pool; /* shorthand */
UINT status;
UINT ticks;

if (sock == NULL) {
PRINTF("NetX Send NULL parameters");
return MQTT_CODE_ERROR_BAD_ARG;
}

pool = sock->fd.nx_tcp_socket_ip_ptr->nx_ip_default_packet_pool;
status = nx_packet_allocate(pool, &packet, NX_TCP_PACKET, timeout_ms);
if (status != NX_SUCCESS) {
PRINTF("NetX Send packet alloc error");
return MQTT_CODE_ERROR_NETWORK;
}

status = nx_packet_data_append(packet, (VOID*)buf, buf_len, pool,
timeout_ms);
if (status != NX_SUCCESS) {
nx_packet_release(packet);
PRINTF("NetX Send data append error");
return MQTT_CODE_ERROR_NETWORK;
}


/* Convert timeout_ms to ticks and round up */
ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000;

status = nx_tcp_socket_send(&sock->fd, packet, ticks);
if (status != NX_SUCCESS) {
nx_packet_release(packet);
PRINTF("NetX Send socket send error");
return MQTT_CODE_ERROR_NETWORK;
}

return buf_len;
}


static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms)
{
SocketContext *sock = (SocketContext*)context;
ULONG left;
ULONG total;
ULONG copied = 0;
UINT status;
UINT ticks;

if (sock == NULL) {
PRINTF("NetX Recv NULL parameters");
return MQTT_CODE_ERROR_BAD_ARG;
}

if (sock->nxPacket == NULL) {
/* Convert timeout_ms to ticks and round up */
ticks = (timeout_ms * TX_TIMER_TICKS_PER_SECOND + 999) / 1000;
status = nx_tcp_socket_receive(&sock->fd, &sock->nxPacket, ticks);
if (status != NX_SUCCESS) {
if (status == NX_NO_PACKET) {
PRINTF("NetX Recv timeout");
return MQTT_CODE_ERROR_TIMEOUT;
}
PRINTF("NetX Recv receive error");
return MQTT_CODE_ERROR_NETWORK;
}
}

if (sock->nxPacket) {
status = nx_packet_length_get(sock->nxPacket, &total);
if (status != NX_SUCCESS) {
PRINTF("NetX Recv length get error");
return MQTT_CODE_ERROR_NETWORK;
}

left = total - sock->nxOffset;
status = nx_packet_data_extract_offset(sock->nxPacket,
sock->nxOffset, buf, buf_len, &copied);
if (status != NX_SUCCESS) {
PRINTF("NetX Recv data extract offset error");
return MQTT_CODE_ERROR_NETWORK;
}

sock->nxOffset += copied;

if (copied == left) {
PRINTF("NetX Recv Drained packet");
nx_packet_release(sock->nxPacket);
sock->nxPacket = NULL;
sock->nxOffset = 0;
}
}

return copied;
}


/* -------------------------------------------------------------------------- */
/* CURL EASY SOCKET BACKEND EXAMPLE */
/* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -1606,7 +1798,9 @@ int MqttClientNet_Init(MqttNet* net, MQTTCtx* mqttCtx)
#if defined(ENABLE_MQTT_CURL)
sockCtx->curl = NULL;
#endif
#if !defined(HAVE_NETX)
sockCtx->fd = SOCKET_INVALID;
#endif
sockCtx->stat = SOCK_BEGIN;
sockCtx->mqttCtx = mqttCtx;

Expand Down
5 changes: 5 additions & 0 deletions examples/mqttnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ typedef struct _SocketContext {
#endif
#ifdef ENABLE_MQTT_WEBSOCKET
void* websocket_ctx;
#endif
#ifdef HAVE_NETX
NX_IP *ipPtr;
NX_PACKET *nxPacket;
ULONG nxOffset;
#endif
MQTTCtx* mqttCtx;
} SocketContext;
Expand Down
7 changes: 7 additions & 0 deletions examples/mqttport.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ extern "C" {
#elif defined(WOLFMQTT_USER_IO)
#include "userio_template.h"

/* NetX */
#elif defined(HAVE_NETX)
#include "nx_api.h"

#define SOCKET_T NX_TCP_SOCKET
#define SOCK_ADDR_IN NXD_ADDRESS

/* Windows */
#elif defined(USE_WINDOWS_API)
#include <winsock2.h>
Expand Down
29 changes: 29 additions & 0 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,35 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg);
return 0;
}

#elif defined(THREADX)
/* ThreadX semaphore */
int wm_SemInit(wm_Sem *s) {
if (tx_semaphore_create(s, NULL, 1) != TX_SUCCESS) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
}
return 0;
}
int wm_SemFree(wm_Sem *s) {
if (tx_semaphore_delete(s) != TX_SUCCESS) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
}
return 0;
}

static UINT semstatus;
int wm_SemLock(wm_Sem *s) {
semstatus = tx_semaphore_get(s, TX_WAIT_FOREVER);
if (semstatus != TX_SUCCESS) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
}
return 0;
}
int wm_SemUnlock(wm_Sem *s) {
if (tx_semaphore_put(s) != TX_SUCCESS) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_SYSTEM);
}
return 0;
}
#endif /* MUTEX */
#endif /* WOLFMQTT_MULTITHREAD */

Expand Down
4 changes: 4 additions & 0 deletions wolfmqtt/mqtt_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@
#include <windows.h>
typedef HANDLE wm_Sem;

#elif defined(THREADX)
#include <tx_api.h>
typedef TX_SEMAPHORE wm_Sem;

#else
#error "Multithreading requires binary semaphore implementation!"
#endif
Expand Down