Skip to content

Commit 38a25f5

Browse files
committed
Increase robustness of LwsApiCall implementation
- Handle partial writes by sending data in multiple iterations - Use retries when message send fails - Track and handle message receive if in parts using `receiveMessage` var
1 parent cf817bc commit 38a25f5

File tree

2 files changed

+113
-47
lines changed

2 files changed

+113
-47
lines changed

src/source/Signaling/LwsApiCalls.c

Lines changed: 110 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,9 @@ INT32 lwsHttpCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason,
246246
if (size != (INT32) pRequestInfo->bodySize) {
247247
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size);
248248
if (size > 0) {
249-
// Schedule again
249+
// Update remainig data and schedule again
250+
pRequestInfo->bodySize -= size;
251+
pRequestInfo->body += size;
250252
lws_client_http_body_pending(wsi, 1);
251253
lws_callback_on_writable(wsi);
252254
} else {
@@ -306,6 +308,8 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
306308
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
307309
case LWS_CALLBACK_CLIENT_RECEIVE:
308310
case LWS_CALLBACK_CLIENT_WRITEABLE:
311+
case LWS_CALLBACK_TIMER:
312+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
309313
break;
310314
default:
311315
DLOGI("WSS callback with reason %d", reason);
@@ -378,9 +382,16 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
378382
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
379383
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);
380384

385+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
381386
// Notify the listener thread
382387
CVAR_BROADCAST(pSignalingClient->connectedCvar);
383388

389+
// Keep connection alive
390+
lws_callback_on_writable(wsi);
391+
break;
392+
case LWS_CALLBACK_TIMER:
393+
lws_callback_on_writable(wsi);
394+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
384395
break;
385396

386397
case LWS_CALLBACK_CLIENT_CLOSED:
@@ -426,79 +437,111 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
426437

427438
DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);
428439

429-
// Store the state as the result
430-
retValue = -1;
440+
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
441+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR);
442+
retValue = -1;
443+
} else {
444+
// Store normal closure status
445+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK);
446+
retValue = 0;
447+
}
431448

432-
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
449+
break;
433450

451+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
452+
DLOGV("Received PONG from server");
434453
break;
435454

436455
case LWS_CALLBACK_CLIENT_RECEIVE:
437456

457+
lwsl_info("WS receive callback, len: %zu, is_final: %d\n",
458+
dataSize, lws_is_final_fragment(wsi));
459+
438460
// Check if it's a binary data
461+
if (lws_frame_is_binary(wsi)) {
462+
DLOGW("Received binary data");
463+
}
439464
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);
440465

441-
// Skip if it's the first and last fragment and the size is 0
442-
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);
466+
// Mark as receiving a message
467+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE);
443468

444-
// Check what type of a message it is. We will set the size to 0 on first and flush on last
445-
if (lws_is_first_fragment(wsi)) {
446-
pLwsCallInfo->receiveBufferSize = 0;
447-
}
448-
449-
// Store the data in the buffer
469+
// Store the data in the receive buffer
450470
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
451471
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
452472
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
453473
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;
454474

455-
// Flush on last
475+
// Process complete message
456476
if (lws_is_final_fragment(wsi)) {
457-
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
477+
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
458478
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
479+
pLwsCallInfo->receiveBufferSize = 0;
480+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
459481
}
460482

461-
lws_callback_on_writable(wsi);
462-
483+
// Keep connection alive after receiving data
484+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
485+
lws_callback_on_writable(wsi);
486+
}
463487
break;
464488

465489
case LWS_CALLBACK_CLIENT_WRITEABLE:
466490
DLOGD("Client is writable");
467491

468-
// Check if we are attempting to terminate the connection
469-
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
470-
retValue = 1;
471-
CHK(FALSE, retStatus);
492+
// Add buffer state check
493+
if (lws_send_pipe_choked(wsi)) {
494+
DLOGI("WS send pipe choked, retrying");
495+
lws_callback_on_writable(wsi);
496+
break;
472497
}
473498

474-
offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
475-
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
476-
writeSize = (INT32) (bufferSize - offset);
477-
478-
// Check if we need to do anything
479-
CHK(writeSize > 0, retStatus);
499+
// Log buffer state before write
500+
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize);
480501

481-
// Send data and notify on completion
482-
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);
483-
484-
if (size < 0) {
485-
DLOGW("Write failed. Returned write size is %d", size);
486-
// Quit
487-
retValue = -1;
488-
CHK(FALSE, retStatus);
502+
// Only check termination if we're not in the middle of receiving a message
503+
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
504+
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus);
489505
}
490506

491-
if (size == writeSize) {
492-
// Notify the listener
493-
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
494-
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
495-
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
496-
} else {
497-
// Partial write
498-
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
499-
// Schedule again
500-
lws_callback_on_writable(wsi);
507+
// Send data if anything is in the buffer
508+
if (pLwsCallInfo->sendBufferSize != 0) {
509+
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE;
510+
// Log write attempt
511+
DLOGD("Attempting to write %zu bytes", remainingSize);
512+
513+
retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE,
514+
remainingSize, LWS_WRITE_TEXT);
515+
if (retValue < 0) {
516+
DLOGW("Write failed with %d", retValue);
517+
CHK(FALSE, !STATUS_SUCCESS);
518+
} else if ((SIZE_T) retValue < remainingSize) {
519+
DLOGW("Partial write occurred: %d of %zu bytes",
520+
retValue, remainingSize);
521+
// Move remaining data to start of buffer
522+
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE,
523+
pLwsCallInfo->sendBuffer + LWS_PRE + retValue,
524+
remainingSize - retValue);
525+
// Update buffer size
526+
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE;
527+
528+
// Handle partial write
529+
lws_callback_on_writable(wsi);
530+
} else {
531+
// Complete write
532+
DLOGI("Write complete: %d bytes", retValue);
533+
pLwsCallInfo->sendBufferSize = 0;
534+
// Keep connection alive after write
535+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
536+
lws_callback_on_writable(wsi);
537+
}
538+
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
539+
// Signal completion immediately after successful write
540+
CVAR_BROADCAST(pSignalingClient->sendCvar);
541+
}
501542
}
543+
// Always return success from writeable callback
544+
retValue = 0;
502545

503546
break;
504547

@@ -569,8 +612,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
569612
// Execute the LWS REST call
570613
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info));
571614
connectInfo.context = pContext;
572-
connectInfo.ssl_connection = LCCSCF_USE_SSL;
615+
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control
573616
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT;
617+
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only
618+
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol
574619

575620
CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd));
576621
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR);
@@ -1344,6 +1389,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
13441389
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo;
13451390
pLwsCallInfo->pSignalingClient = pSignalingClient;
13461391
pLwsCallInfo->protocolIndex = protocolIndex;
1392+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
13471393

13481394
*ppLwsCallInfo = pLwsCallInfo;
13491395

@@ -1907,6 +1953,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19071953
SIZE_T offset, size;
19081954
SERVICE_CALL_RESULT result;
19091955

1956+
UINT32 retryCount = 0;
1957+
const UINT32 MAX_RETRY_COUNT = 3;
1958+
19101959
CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG);
19111960

19121961
// See if anything needs to be done
@@ -1920,22 +1969,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19201969

19211970
MUTEX_LOCK(pSignalingClient->sendLock);
19221971
sendLocked = TRUE;
1923-
while (iterate) {
1972+
while (iterate && retryCount < MAX_RETRY_COUNT) {
19241973
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset);
19251974
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize);
19261975

19271976
result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult);
19281977

19291978
if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) {
19301979
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT));
1980+
retryCount++;
1981+
} else if (result == SERVICE_CALL_UNKNOWN) {
1982+
// Partial write occurred, continue waiting
1983+
retryCount = 0;
1984+
continue;
19311985
} else {
19321986
iterate = FALSE;
19331987
}
1988+
if (iterate) {
1989+
// Wake up the service event loop
1990+
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS));
1991+
}
19341992
}
1935-
19361993
MUTEX_UNLOCK(pSignalingClient->sendLock);
19371994
sendLocked = FALSE;
19381995

1996+
// Check if we timed out
1997+
if (retryCount >= MAX_RETRY_COUNT) {
1998+
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT);
1999+
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED);
2000+
}
2001+
19392002
// Do not await for the response in case of correlation id not specified
19402003
CHK(awaitForResponse, retStatus);
19412004

src/source/Signaling/LwsApiCalls.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ struct __LwsCallInfo {
208208
// Service exit indicator;
209209
volatile ATOMIC_BOOL cancelService;
210210

211+
// Message receiving indicator
212+
volatile ATOMIC_BOOL receiveMessage;
213+
211214
// Protocol index
212215
UINT32 protocolIndex;
213216

0 commit comments

Comments
 (0)