-
Notifications
You must be signed in to change notification settings - Fork 362
Increase robustness of LwsApiCall implementation #2134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-v1.14.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,7 +251,9 @@ INT32 lwsHttpCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn, | |
if (size != (INT32) pRequestInfo->bodySize) { | ||
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size); | ||
if (size > 0) { | ||
// Schedule again | ||
// Update remainig data and schedule again | ||
pRequestInfo->bodySize -= size; | ||
pRequestInfo->body += size; | ||
lws_client_http_body_pending((struct lws*) wsi, 1); | ||
lws_callback_on_writable((struct lws*) wsi); | ||
} else { | ||
|
@@ -311,6 +313,8 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn, | |
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: | ||
case LWS_CALLBACK_CLIENT_RECEIVE: | ||
case LWS_CALLBACK_CLIENT_WRITEABLE: | ||
case LWS_CALLBACK_TIMER: | ||
case LWS_CALLBACK_CLIENT_RECEIVE_PONG: | ||
break; | ||
default: | ||
DLOGI("WSS callback with reason %d", reason); | ||
|
@@ -383,9 +387,16 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn, | |
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient); | ||
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock); | ||
|
||
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND); | ||
// Notify the listener thread | ||
CVAR_BROADCAST(pSignalingClient->connectedCvar); | ||
|
||
// Keep connection alive | ||
lws_callback_on_writable(wsi); | ||
break; | ||
case LWS_CALLBACK_TIMER: | ||
lws_callback_on_writable(wsi); | ||
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND); | ||
break; | ||
|
||
case LWS_CALLBACK_CLIENT_CLOSED: | ||
|
@@ -431,79 +442,106 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn, | |
|
||
DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr); | ||
|
||
// Store the state as the result | ||
retValue = -1; | ||
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) { | ||
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR); | ||
retValue = -1; | ||
} else { | ||
// Store normal closure status | ||
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK); | ||
retValue = 0; | ||
} | ||
|
||
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status); | ||
break; | ||
|
||
case LWS_CALLBACK_CLIENT_RECEIVE_PONG: | ||
DLOGV("Received PONG from server"); | ||
break; | ||
|
||
case LWS_CALLBACK_CLIENT_RECEIVE: | ||
|
||
lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi)); | ||
|
||
// Check if it's a binary data | ||
if (lws_frame_is_binary(wsi)) { | ||
DLOGW("Received binary data"); | ||
} | ||
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED); | ||
|
||
// Skip if it's the first and last fragment and the size is 0 | ||
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus); | ||
|
||
// Check what type of a message it is. We will set the size to 0 on first and flush on last | ||
if (lws_is_first_fragment(wsi)) { | ||
pLwsCallInfo->receiveBufferSize = 0; | ||
} | ||
// Mark as receiving a message | ||
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE); | ||
|
||
// Store the data in the buffer | ||
// Store the data in the receive buffer | ||
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer), | ||
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN); | ||
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize); | ||
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize; | ||
|
||
// Flush on last | ||
// Process complete message | ||
if (lws_is_final_fragment(wsi)) { | ||
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE], | ||
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE], | ||
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR))); | ||
pLwsCallInfo->receiveBufferSize = 0; | ||
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE); | ||
} | ||
|
||
lws_callback_on_writable(wsi); | ||
|
||
// Keep connection alive after receiving data | ||
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) { | ||
lws_callback_on_writable(wsi); | ||
} | ||
break; | ||
|
||
case LWS_CALLBACK_CLIENT_WRITEABLE: | ||
DLOGD("Client is writable"); | ||
|
||
// Check if we are attempting to terminate the connection | ||
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) { | ||
retValue = 1; | ||
CHK(FALSE, retStatus); | ||
// Add buffer state check | ||
if (lws_send_pipe_choked(wsi)) { | ||
DLOGI("WS send pipe choked, retrying"); | ||
lws_callback_on_writable(wsi); | ||
break; | ||
} | ||
|
||
offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset); | ||
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize); | ||
writeSize = (INT32) (bufferSize - offset); | ||
|
||
// Check if we need to do anything | ||
CHK(writeSize > 0, retStatus); | ||
|
||
// Send data and notify on completion | ||
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT); | ||
// Log buffer state before write | ||
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize); | ||
|
||
if (size < 0) { | ||
DLOGW("Write failed. Returned write size is %d", size); | ||
// Quit | ||
retValue = -1; | ||
CHK(FALSE, retStatus); | ||
// Only check termination if we're not in the middle of receiving a message | ||
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) { | ||
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus); | ||
} | ||
|
||
if (size == writeSize) { | ||
// Notify the listener | ||
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0); | ||
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0); | ||
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar); | ||
} else { | ||
// Partial write | ||
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize); | ||
// Schedule again | ||
lws_callback_on_writable(wsi); | ||
// Send data if anything is in the buffer | ||
if (pLwsCallInfo->sendBufferSize != 0) { | ||
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE; | ||
// Log write attempt | ||
DLOGD("Attempting to write %zu bytes", remainingSize); | ||
|
||
retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT); | ||
if (retValue < 0) { | ||
DLOGW("Write failed with %d", retValue); | ||
CHK(FALSE, retValue); // Return non-zero value to callback to indicate failure | ||
} else if ((SIZE_T) retValue < remainingSize) { | ||
DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize); | ||
// Move remaining data to start of buffer | ||
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue); | ||
// Update buffer size | ||
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE; | ||
|
||
// Handle partial write | ||
lws_callback_on_writable(wsi); | ||
} else { | ||
// Complete write | ||
DLOGI("Write complete: %d bytes", retValue); | ||
pLwsCallInfo->sendBufferSize = 0; | ||
// Keep connection alive after write | ||
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) { | ||
lws_callback_on_writable(wsi); | ||
} | ||
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK); | ||
// Signal completion immediately after successful write | ||
CVAR_BROADCAST(pSignalingClient->sendCvar); | ||
} | ||
} | ||
// Always return success from writeable callback | ||
retValue = 0; | ||
|
||
break; | ||
|
||
|
@@ -574,8 +612,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo) | |
// Execute the LWS REST call | ||
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info)); | ||
connectInfo.context = pContext; | ||
connectInfo.ssl_connection = LCCSCF_USE_SSL; | ||
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control | ||
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT; | ||
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only | ||
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol | ||
|
||
CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd)); | ||
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR); | ||
|
@@ -1381,6 +1421,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques | |
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo; | ||
pLwsCallInfo->pSignalingClient = pSignalingClient; | ||
pLwsCallInfo->protocolIndex = protocolIndex; | ||
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE); | ||
|
||
*ppLwsCallInfo = pLwsCallInfo; | ||
|
||
|
@@ -1944,6 +1985,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse) | |
SIZE_T offset, size; | ||
SERVICE_CALL_RESULT result; | ||
|
||
UINT32 retryCount = 0; | ||
const UINT32 MAX_RETRY_COUNT = 3; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking this should be a |
||
|
||
CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG); | ||
|
||
// See if anything needs to be done | ||
|
@@ -1957,22 +2001,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse) | |
|
||
MUTEX_LOCK(pSignalingClient->sendLock); | ||
sendLocked = TRUE; | ||
while (iterate) { | ||
while (iterate && retryCount < MAX_RETRY_COUNT) { | ||
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset); | ||
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize); | ||
|
||
result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult); | ||
|
||
if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) { | ||
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT)); | ||
retryCount++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if we can debug log (DLOGD) the retry count increased There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sirknightj looks a bit odd, but here is what's happening:
So, retryCount is basically, making it |
||
} else if (result == SERVICE_CALL_UNKNOWN) { | ||
// Partial write occurred, continue waiting | ||
retryCount = 0; | ||
continue; | ||
} else { | ||
iterate = FALSE; | ||
} | ||
if (iterate) { | ||
// Wake up the service event loop | ||
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS)); | ||
} | ||
} | ||
|
||
MUTEX_UNLOCK(pSignalingClient->sendLock); | ||
sendLocked = FALSE; | ||
|
||
// Check if we timed out | ||
if (retryCount >= MAX_RETRY_COUNT) { | ||
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT); | ||
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED); | ||
} | ||
|
||
// Do not await for the response in case of correlation id not specified | ||
CHK(awaitForResponse, retStatus); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering the purpose of the H2 flag if 1.1 is forced, is it for latency/performance optimization?
It seems that HTTP 2 should from this quick test:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to make this ws connection work more reliably on ESP platforms. Helps reduce memory consumption a bit and works much reliably. BTW, do we really gain much with http2 for our use case?
Now that, I have lot of optimisations in place, I can test http2 with ESP32 and revert the change.