Skip to content

Commit 9f7ebd9

Browse files
committed
Increase robustness of LwsApiCall implementation
- Handle partial writes by sending data in multiple iterations - Use retries when message send fails - Track and handle message receive if in parts using `receiveMessage` var
1 parent cf817bc commit 9f7ebd9

File tree

2 files changed

+108
-47
lines changed

2 files changed

+108
-47
lines changed

src/source/Signaling/LwsApiCalls.c

Lines changed: 105 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,9 @@ INT32 lwsHttpCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason,
246246
if (size != (INT32) pRequestInfo->bodySize) {
247247
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size);
248248
if (size > 0) {
249-
// Schedule again
249+
// Update remainig data and schedule again
250+
pRequestInfo->bodySize -= size;
251+
pRequestInfo->body += size;
250252
lws_client_http_body_pending(wsi, 1);
251253
lws_callback_on_writable(wsi);
252254
} else {
@@ -306,6 +308,8 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
306308
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
307309
case LWS_CALLBACK_CLIENT_RECEIVE:
308310
case LWS_CALLBACK_CLIENT_WRITEABLE:
311+
case LWS_CALLBACK_TIMER:
312+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
309313
break;
310314
default:
311315
DLOGI("WSS callback with reason %d", reason);
@@ -378,9 +382,16 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
378382
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
379383
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);
380384

385+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
381386
// Notify the listener thread
382387
CVAR_BROADCAST(pSignalingClient->connectedCvar);
383388

389+
// Keep connection alive
390+
lws_callback_on_writable(wsi);
391+
break;
392+
case LWS_CALLBACK_TIMER:
393+
lws_callback_on_writable(wsi);
394+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
384395
break;
385396

386397
case LWS_CALLBACK_CLIENT_CLOSED:
@@ -426,79 +437,106 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
426437

427438
DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);
428439

429-
// Store the state as the result
430-
retValue = -1;
440+
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
441+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR);
442+
retValue = -1;
443+
} else {
444+
// Store normal closure status
445+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK);
446+
retValue = 0;
447+
}
431448

432-
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
449+
break;
433450

451+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
452+
DLOGV("Received PONG from server");
434453
break;
435454

436455
case LWS_CALLBACK_CLIENT_RECEIVE:
437456

457+
lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi));
458+
438459
// Check if it's a binary data
460+
if (lws_frame_is_binary(wsi)) {
461+
DLOGW("Received binary data");
462+
}
439463
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);
440464

441-
// Skip if it's the first and last fragment and the size is 0
442-
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);
465+
// Mark as receiving a message
466+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE);
443467

444-
// Check what type of a message it is. We will set the size to 0 on first and flush on last
445-
if (lws_is_first_fragment(wsi)) {
446-
pLwsCallInfo->receiveBufferSize = 0;
447-
}
448-
449-
// Store the data in the buffer
468+
// Store the data in the receive buffer
450469
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
451470
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
452471
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
453472
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;
454473

455-
// Flush on last
474+
// Process complete message
456475
if (lws_is_final_fragment(wsi)) {
457-
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
476+
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
458477
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
478+
pLwsCallInfo->receiveBufferSize = 0;
479+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
459480
}
460481

461-
lws_callback_on_writable(wsi);
462-
482+
// Keep connection alive after receiving data
483+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
484+
lws_callback_on_writable(wsi);
485+
}
463486
break;
464487

465488
case LWS_CALLBACK_CLIENT_WRITEABLE:
466489
DLOGD("Client is writable");
467490

468-
// Check if we are attempting to terminate the connection
469-
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
470-
retValue = 1;
471-
CHK(FALSE, retStatus);
491+
// Add buffer state check
492+
if (lws_send_pipe_choked(wsi)) {
493+
DLOGI("WS send pipe choked, retrying");
494+
lws_callback_on_writable(wsi);
495+
break;
472496
}
473497

474-
offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
475-
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
476-
writeSize = (INT32) (bufferSize - offset);
477-
478-
// Check if we need to do anything
479-
CHK(writeSize > 0, retStatus);
498+
// Log buffer state before write
499+
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize);
480500

481-
// Send data and notify on completion
482-
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);
483-
484-
if (size < 0) {
485-
DLOGW("Write failed. Returned write size is %d", size);
486-
// Quit
487-
retValue = -1;
488-
CHK(FALSE, retStatus);
501+
// Only check termination if we're not in the middle of receiving a message
502+
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
503+
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus);
489504
}
490505

491-
if (size == writeSize) {
492-
// Notify the listener
493-
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
494-
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
495-
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
496-
} else {
497-
// Partial write
498-
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
499-
// Schedule again
500-
lws_callback_on_writable(wsi);
506+
// Send data if anything is in the buffer
507+
if (pLwsCallInfo->sendBufferSize != 0) {
508+
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE;
509+
// Log write attempt
510+
DLOGD("Attempting to write %zu bytes", remainingSize);
511+
512+
retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT);
513+
if (retValue < 0) {
514+
DLOGW("Write failed with %d", retValue);
515+
CHK(FALSE, !STATUS_SUCCESS);
516+
} else if ((SIZE_T) retValue < remainingSize) {
517+
DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize);
518+
// Move remaining data to start of buffer
519+
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue);
520+
// Update buffer size
521+
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE;
522+
523+
// Handle partial write
524+
lws_callback_on_writable(wsi);
525+
} else {
526+
// Complete write
527+
DLOGI("Write complete: %d bytes", retValue);
528+
pLwsCallInfo->sendBufferSize = 0;
529+
// Keep connection alive after write
530+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
531+
lws_callback_on_writable(wsi);
532+
}
533+
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
534+
// Signal completion immediately after successful write
535+
CVAR_BROADCAST(pSignalingClient->sendCvar);
536+
}
501537
}
538+
// Always return success from writeable callback
539+
retValue = 0;
502540

503541
break;
504542

@@ -569,8 +607,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
569607
// Execute the LWS REST call
570608
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info));
571609
connectInfo.context = pContext;
572-
connectInfo.ssl_connection = LCCSCF_USE_SSL;
610+
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control
573611
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT;
612+
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only
613+
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol
574614

575615
CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd));
576616
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR);
@@ -1344,6 +1384,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
13441384
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo;
13451385
pLwsCallInfo->pSignalingClient = pSignalingClient;
13461386
pLwsCallInfo->protocolIndex = protocolIndex;
1387+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
13471388

13481389
*ppLwsCallInfo = pLwsCallInfo;
13491390

@@ -1907,6 +1948,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19071948
SIZE_T offset, size;
19081949
SERVICE_CALL_RESULT result;
19091950

1951+
UINT32 retryCount = 0;
1952+
const UINT32 MAX_RETRY_COUNT = 3;
1953+
19101954
CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG);
19111955

19121956
// See if anything needs to be done
@@ -1920,22 +1964,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19201964

19211965
MUTEX_LOCK(pSignalingClient->sendLock);
19221966
sendLocked = TRUE;
1923-
while (iterate) {
1967+
while (iterate && retryCount < MAX_RETRY_COUNT) {
19241968
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset);
19251969
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize);
19261970

19271971
result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult);
19281972

19291973
if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) {
19301974
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT));
1975+
retryCount++;
1976+
} else if (result == SERVICE_CALL_UNKNOWN) {
1977+
// Partial write occurred, continue waiting
1978+
retryCount = 0;
1979+
continue;
19311980
} else {
19321981
iterate = FALSE;
19331982
}
1983+
if (iterate) {
1984+
// Wake up the service event loop
1985+
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS));
1986+
}
19341987
}
1935-
19361988
MUTEX_UNLOCK(pSignalingClient->sendLock);
19371989
sendLocked = FALSE;
19381990

1991+
// Check if we timed out
1992+
if (retryCount >= MAX_RETRY_COUNT) {
1993+
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT);
1994+
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED);
1995+
}
1996+
19391997
// Do not await for the response in case of correlation id not specified
19401998
CHK(awaitForResponse, retStatus);
19411999

src/source/Signaling/LwsApiCalls.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ struct __LwsCallInfo {
208208
// Service exit indicator;
209209
volatile ATOMIC_BOOL cancelService;
210210

211+
// Message receiving indicator
212+
volatile ATOMIC_BOOL receiveMessage;
213+
211214
// Protocol index
212215
UINT32 protocolIndex;
213216

0 commit comments

Comments
 (0)