-
Notifications
You must be signed in to change notification settings - Fork 4.2k
feat(ws): add context logic to enterprise WS worker fixes NV-6821 #9385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ChmaraX
merged 2 commits into
next
from
nv-6821-updating-cloudflare-sockets-with-new-context-logic
Oct 22, 2025
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,8 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
return new Response('Missing JWT token', { status: 400 }); | ||
} | ||
|
||
const contextKeys = this.extractContextKeysFromHeader(request); | ||
|
||
const [client, server] = Object.values(new WebSocketPair()); | ||
|
||
/* | ||
|
@@ -64,8 +66,11 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
server.serializeAttachment({ | ||
jwtToken, | ||
connectedAt: Date.now(), | ||
contextKeys, | ||
}); | ||
|
||
this.logConnectionAccepted(userId, contextKeys); | ||
|
||
// Use waitUntil to allow hibernation without waiting for API call | ||
this.ctx.waitUntil( | ||
this.notifySubscriberOnlineState(userId, environmentId, true, undefined, jwtToken).catch((error) => | ||
|
@@ -122,7 +127,7 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
/** | ||
* Send message to a specific user | ||
*/ | ||
async sendToUser(userId: string, event: string, data: unknown): Promise<void> { | ||
async sendToUser(userId: string, event: string, data: unknown, contextKeys?: string[]): Promise<void> { | ||
const userConnections = this.ctx.getWebSockets(`user:${userId}`); | ||
|
||
if (userConnections.length === 0) { | ||
|
@@ -136,20 +141,30 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
timestamp: Date.now(), | ||
}); | ||
|
||
// Send to all user connections in parallel using Promise.allSettled | ||
// This prevents one failed connection from blocking others | ||
const sendPromises = userConnections.map(async (ws) => { | ||
try { | ||
ws.send(message); | ||
} catch (error) { | ||
console.error(`Failed to send message to user ${userId}:`, error); | ||
// Connection will be cleaned up automatically by Cloudflare | ||
throw error; // Re-throw to be caught by Promise.allSettled | ||
} | ||
}); | ||
if (this.isFeatureFlagOff(contextKeys)) { | ||
await this.broadcastToAllSockets(userId, event, message, userConnections); | ||
} else { | ||
await this.sendToMatchingContexts(userId, event, message, contextKeys, userConnections); | ||
} | ||
} | ||
|
||
// Wait for all sends to complete, but don't fail if some connections error | ||
await Promise.allSettled(sendPromises); | ||
/** | ||
* Context matching logic (same as ws.gateway.ts) | ||
*/ | ||
private isExactMatch(messageContextKeys: string[], inboxContextKeys?: string[]): boolean { | ||
if (inboxContextKeys === undefined) { | ||
return true; | ||
} | ||
|
||
if (messageContextKeys.length === 0) { | ||
return inboxContextKeys.length === 0; | ||
} | ||
|
||
if (messageContextKeys.length !== inboxContextKeys.length) { | ||
return false; | ||
} | ||
|
||
return messageContextKeys.every((key) => inboxContextKeys.includes(key)); | ||
} | ||
|
||
/** | ||
|
@@ -255,6 +270,7 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
environmentId, | ||
connectedAt: attachment.connectedAt || Date.now(), | ||
jwtToken: attachment.jwtToken, | ||
contextKeys: attachment.contextKeys, | ||
}; | ||
} | ||
|
||
|
@@ -276,4 +292,146 @@ export class WebSocketRoom extends DurableObject<IEnv> { | |
); | ||
} | ||
} | ||
|
||
private extractContextKeysFromHeader(request: Request): string[] | undefined { | ||
const contextKeysHeader = request.headers.get('X-Context-Keys'); | ||
|
||
if (!contextKeysHeader) { | ||
return undefined; | ||
} | ||
|
||
try { | ||
return JSON.parse(contextKeysHeader); | ||
} catch (e) { | ||
console.error('Failed to parse contextKeys:', e); | ||
|
||
return undefined; | ||
} | ||
} | ||
|
||
/** | ||
* Log connection acceptance with context information | ||
*/ | ||
private logConnectionAccepted(userId: string, contextKeys?: string[]): void { | ||
const contextDisplay = this.formatContextDisplay(contextKeys); | ||
console.log(`Connection accepted for ${userId} with contexts: ${contextDisplay}`); | ||
} | ||
|
||
/** | ||
* Format context keys for display in logs | ||
*/ | ||
private formatContextDisplay(contextKeys?: string[]): string { | ||
if (contextKeys === undefined) { | ||
return 'FF disabled'; | ||
} | ||
|
||
if (contextKeys.length === 0) { | ||
return 'no context'; | ||
} | ||
|
||
return contextKeys.join(', '); | ||
} | ||
|
||
/** | ||
* Check if feature flag is OFF (contextKeys is undefined) | ||
*/ | ||
private isFeatureFlagOff(contextKeys?: string[]): boolean { | ||
return contextKeys === undefined; | ||
} | ||
|
||
/** | ||
* Broadcast message to all user connections (FF OFF behavior) | ||
*/ | ||
private async broadcastToAllSockets( | ||
userId: string, | ||
event: string, | ||
message: string, | ||
sockets: WebSocket[] | ||
): Promise<void> { | ||
console.log(`Sending event ${event} to all ${sockets.length} socket(s) (FF disabled)`); | ||
|
||
const sendPromises = sockets.map(async (ws) => { | ||
try { | ||
ws.send(message); | ||
} catch (error) { | ||
console.error(`Failed to send message to user ${userId}:`, error); | ||
throw error; | ||
} | ||
}); | ||
|
||
await Promise.allSettled(sendPromises); | ||
} | ||
|
||
|
||
/** | ||
* Send message only to sockets with matching contexts (FF ON behavior) | ||
*/ | ||
private async sendToMatchingContexts( | ||
userId: string, | ||
event: string, | ||
message: string, | ||
messageContextKeys: string[] | undefined, | ||
sockets: WebSocket[] | ||
): Promise<void> { | ||
if (!messageContextKeys) { | ||
return; | ||
} | ||
console.log( | ||
`Sending event ${event} to ${userId} with message contexts: ${this.formatContextDisplay(messageContextKeys)} (${sockets.length} socket(s))` | ||
); | ||
|
||
const sendPromises = sockets.map(async (ws) => { | ||
const metadata = this.getConnectionMetadata(ws); | ||
|
||
if (!metadata) { | ||
console.warn(`No metadata found for socket, skipping`); | ||
|
||
return; | ||
} | ||
|
||
const inboxContextKeys = metadata.contextKeys; | ||
|
||
if (this.shouldDeliverMessage(messageContextKeys, inboxContextKeys)) { | ||
await this.deliverMessageToSocket(ws, message, userId, inboxContextKeys); | ||
} else { | ||
this.logContextMismatch(messageContextKeys, inboxContextKeys); | ||
} | ||
}); | ||
|
||
await Promise.allSettled(sendPromises); | ||
} | ||
|
||
/** | ||
* Determine if message should be delivered based on context match | ||
*/ | ||
private shouldDeliverMessage(messageContextKeys: string[], inboxContextKeys?: string[]): boolean { | ||
return this.isExactMatch(messageContextKeys, inboxContextKeys); | ||
} | ||
|
||
/** | ||
* Deliver message to a specific socket | ||
*/ | ||
private async deliverMessageToSocket( | ||
ws: WebSocket, | ||
message: string, | ||
userId: string, | ||
inboxContextKeys?: string[] | ||
): Promise<void> { | ||
try { | ||
ws.send(message); | ||
console.log(`Delivered to socket with inbox contexts: ${this.formatContextDisplay(inboxContextKeys)}`); | ||
} catch (error) { | ||
console.error(`Failed to send message to user ${userId}:`, error); | ||
throw error; | ||
} | ||
} | ||
|
||
/** | ||
* Log when a socket is skipped due to context mismatch | ||
*/ | ||
private logContextMismatch(messageContextKeys: string[], inboxContextKeys?: string[]): void { | ||
const messageDisplay = messageContextKeys.length === 0 ? 'none' : messageContextKeys.join(', '); | ||
const inboxDisplay = inboxContextKeys?.length === 0 ? 'none' : inboxContextKeys?.join(', ') || 'none'; | ||
|
||
console.log(`Skipped socket - contexts mismatch. Message: [${messageDisplay}], Inbox: [${inboxDisplay}]`); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.