Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 105 additions & 47 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ INT32 lwsHttpCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
if (size != (INT32) pRequestInfo->bodySize) {
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size);
if (size > 0) {
// Schedule again
// Update remainig data and schedule again
pRequestInfo->bodySize -= size;
pRequestInfo->body += size;
lws_client_http_body_pending((struct lws*) wsi, 1);
lws_callback_on_writable((struct lws*) wsi);
} else {
Expand Down Expand Up @@ -311,6 +313,8 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_TIMER:
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
break;
default:
DLOGI("WSS callback with reason %d", reason);
Expand Down Expand Up @@ -383,9 +387,16 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);

lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
// Notify the listener thread
CVAR_BROADCAST(pSignalingClient->connectedCvar);

// Keep connection alive
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_TIMER:
lws_callback_on_writable(wsi);
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
break;

case LWS_CALLBACK_CLIENT_CLOSED:
Expand Down Expand Up @@ -431,79 +442,106 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,

DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);

// Store the state as the result
retValue = -1;
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR);
retValue = -1;
} else {
// Store normal closure status
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK);
retValue = 0;
}

ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
break;

case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
DLOGV("Received PONG from server");
break;

case LWS_CALLBACK_CLIENT_RECEIVE:

lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi));

// Check if it's a binary data
if (lws_frame_is_binary(wsi)) {
DLOGW("Received binary data");
}
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);

// Skip if it's the first and last fragment and the size is 0
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);

// Check what type of a message it is. We will set the size to 0 on first and flush on last
if (lws_is_first_fragment(wsi)) {
pLwsCallInfo->receiveBufferSize = 0;
}
// Mark as receiving a message
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE);

// Store the data in the buffer
// Store the data in the receive buffer
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;

// Flush on last
// Process complete message
if (lws_is_final_fragment(wsi)) {
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
pLwsCallInfo->receiveBufferSize = 0;
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
}

lws_callback_on_writable(wsi);

// Keep connection alive after receiving data
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
lws_callback_on_writable(wsi);
}
break;

case LWS_CALLBACK_CLIENT_WRITEABLE:
DLOGD("Client is writable");

// Check if we are attempting to terminate the connection
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
retValue = 1;
CHK(FALSE, retStatus);
// Add buffer state check
if (lws_send_pipe_choked(wsi)) {
DLOGI("WS send pipe choked, retrying");
lws_callback_on_writable(wsi);
break;
}

offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
writeSize = (INT32) (bufferSize - offset);

// Check if we need to do anything
CHK(writeSize > 0, retStatus);

// Send data and notify on completion
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);
// Log buffer state before write
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize);

if (size < 0) {
DLOGW("Write failed. Returned write size is %d", size);
// Quit
retValue = -1;
CHK(FALSE, retStatus);
// Only check termination if we're not in the middle of receiving a message
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus);
}

if (size == writeSize) {
// Notify the listener
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
} else {
// Partial write
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
// Schedule again
lws_callback_on_writable(wsi);
// Send data if anything is in the buffer
if (pLwsCallInfo->sendBufferSize != 0) {
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE;
// Log write attempt
DLOGD("Attempting to write %zu bytes", remainingSize);

retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT);
if (retValue < 0) {
DLOGW("Write failed with %d", retValue);
CHK(FALSE, retValue); // Return non-zero value to callback to indicate failure
} else if ((SIZE_T) retValue < remainingSize) {
DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize);
// Move remaining data to start of buffer
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue);
// Update buffer size
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE;

// Handle partial write
lws_callback_on_writable(wsi);
} else {
// Complete write
DLOGI("Write complete: %d bytes", retValue);
pLwsCallInfo->sendBufferSize = 0;
// Keep connection alive after write
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
lws_callback_on_writable(wsi);
}
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
// Signal completion immediately after successful write
CVAR_BROADCAST(pSignalingClient->sendCvar);
}
}
// Always return success from writeable callback
retValue = 0;

break;

Expand Down Expand Up @@ -574,8 +612,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
// Execute the LWS REST call
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info));
connectInfo.context = pContext;
connectInfo.ssl_connection = LCCSCF_USE_SSL;
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT;
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering the purpose of the H2 flag if 1.1 is forced, is it for latency/performance optimization?

It seems that HTTP 2 should from this quick test:

curl -sI https://m-xxxxxxxx.kinesisvideo.us-west-2.amazonaws.com -o/dev/null -w '%{http_version}\n'
2
curl -sI https://v-xxxxxxxx.kinesisvideo.us-west-2.amazonaws.com -o/dev/null -w '%{http_version}\n'
2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to make this ws connection work more reliably on ESP platforms. Helps reduce memory consumption a bit and works much reliably. BTW, do we really gain much with http2 for our use case?

Now that, I have lot of optimisations in place, I can test http2 with ESP32 and revert the change.


CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd));
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR);
Expand Down Expand Up @@ -1381,6 +1421,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo;
pLwsCallInfo->pSignalingClient = pSignalingClient;
pLwsCallInfo->protocolIndex = protocolIndex;
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);

*ppLwsCallInfo = pLwsCallInfo;

Expand Down Expand Up @@ -1944,6 +1985,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
SIZE_T offset, size;
SERVICE_CALL_RESULT result;

UINT32 retryCount = 0;
const UINT32 MAX_RETRY_COUNT = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking this should be a #define (very very minor memory reduction)


CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG);

// See if anything needs to be done
Expand All @@ -1957,22 +2001,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)

MUTEX_LOCK(pSignalingClient->sendLock);
sendLocked = TRUE;
while (iterate) {
while (iterate && retryCount < MAX_RETRY_COUNT) {
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset);
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize);

result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult);

if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) {
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT));
retryCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can debug log (DLOGD) the retry count increased

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the CVAR_WAIT returned non-success (eg status operation timed out), CHK_STATUS would goto Cleanup and bypass the retry count -- doesn't seem intended since there's a "// Check if we timed out" down below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sirknightj looks a bit odd, but here is what's happening:

  • If CVAR_WAIT returns on timeout, we exit anyway.
  • Only if it was returned early, we increment the retryCount and go again for wait if offset != size.
    else we stop iterating and go for the rest of the code path.

So, retryCount is basically, making it retry if wakeup was within timeout.
Let me know if you have a better suggestion to handle sendCvar wake-ups without just giving up entirely.

} else if (result == SERVICE_CALL_UNKNOWN) {
// Partial write occurred, continue waiting
retryCount = 0;
continue;
} else {
iterate = FALSE;
}
if (iterate) {
// Wake up the service event loop
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS));
}
}

MUTEX_UNLOCK(pSignalingClient->sendLock);
sendLocked = FALSE;

// Check if we timed out
if (retryCount >= MAX_RETRY_COUNT) {
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT);
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED);
}

// Do not await for the response in case of correlation id not specified
CHK(awaitForResponse, retStatus);

Expand Down
3 changes: 3 additions & 0 deletions src/source/Signaling/LwsApiCalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ struct __LwsCallInfo {
// Service exit indicator;
volatile ATOMIC_BOOL cancelService;

// Message receiving indicator
volatile ATOMIC_BOOL receiveMessage;

// Protocol index
UINT32 protocolIndex;

Expand Down
Loading