-
-
Notifications
You must be signed in to change notification settings - Fork 98
feat: websocket reconnect #405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mcollina
merged 25 commits into
fastify:main
from
simone-sanfratello:feat/websocket-reconnect
Feb 19, 2025
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
584e1f1
wip
simone-sanfratello a8dbdfd
wip
simone-sanfratello 8592261
wip
simone-sanfratello c6071e7
wip
simone-sanfratello b827e9b
wip
simone-sanfratello 54ad7a4
wip
simone-sanfratello 9350edc
wip
simone-sanfratello 7dd97b9
wip
simone-sanfratello b0a3dae
wip
simone-sanfratello ae2a31b
wip
simone-sanfratello 8f9501f
wip
simone-sanfratello 01d02b1
wip
simone-sanfratello e13f2fd
wip
simone-sanfratello ac1225a
feat: websocket reconnection
simone-sanfratello bfaf30a
feat: add onReconnect hook to wsReconnect
simone-sanfratello 51f447d
fix: await onReconnect
simone-sanfratello 78597c6
wip
simone-sanfratello 08d7394
feat: introduce hooks on message
simone-sanfratello 08bcc07
chore: default hooks
simone-sanfratello 83047a9
wip
simone-sanfratello 2610677
wip
simone-sanfratello 15e6ba8
wip
simone-sanfratello bd35703
add tests
simone-sanfratello b0f65c2
add reconnection example
simone-sanfratello 3a51c6c
add params to hooks
simone-sanfratello File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Reconnection Example | ||
|
||
This example demonstrates how to use the reconnection feature of the proxy. | ||
|
||
It simulates an unstable target service: slow to start, unresponsive due to block of the event loop, crash and restart. | ||
|
||
The goal is to ensures a more resilient and customizable integration, minimizing disruptions caused by connection instability. | ||
|
||
|
||
## How to run | ||
|
||
Run the unstable target | ||
|
||
``` | ||
cd examples/reconnection/unstable-target | ||
npm run unstable | ||
``` | ||
|
||
Run the proxy | ||
|
||
``` | ||
cd examples/reconnection/proxy | ||
npm run start | ||
``` | ||
|
||
Then run the client | ||
|
||
``` | ||
cd examples/reconnection/client | ||
npm run start | ||
``` | ||
|
||
--- | ||
|
||
## How it works | ||
|
||
### Proxy Connection Monitoring and Recovery | ||
|
||
The proxy monitors the target connection using a ping/pong mechanism. If a pong response does not arrive on time, the connection is closed, and the proxy attempts to reconnect. | ||
|
||
If the target service crashes, the connection may close either gracefully or abruptly. Regardless of how the disconnection occurs, the proxy detects the connection loss and initiates a reconnection attempt. | ||
|
||
### Connection Stability | ||
|
||
- The connection between the client and the proxy remains unaffected by an unstable target. | ||
- The connection between the proxy and the target may be closed due to: | ||
- The target failing to respond to ping messages, even if the connection is still technically open (e.g., due to a freeze or blockage). | ||
- The target crashing and restarting. | ||
|
||
### Handling Data Loss During Reconnection | ||
|
||
The proxy supports hooks to manage potential data loss during reconnection. These hooks allow for custom logic to ensure message integrity when resending data from the client to the target. | ||
|
||
Examples of how hooks can be used based on the target service type: | ||
|
||
- GraphQL subscriptions: Resend the subscription from the last received message. | ||
- Message brokers: Resend messages starting from the last successfully processed message. | ||
|
||
In this example, the proxy re-sends the messages from the last ping to ensure all the messages are sent to the target, without any additional logic. | ||
Resending messages from the last pong ensures that the target does not miss any messages, but it may send messages more than once. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
'use strict' | ||
|
||
const WebSocket = require('ws') | ||
|
||
const port = process.env.PORT || 3001 | ||
|
||
// connect to proxy | ||
|
||
const url = `ws://localhost:${port}/` | ||
const ws = new WebSocket(url) | ||
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true }) | ||
|
||
client.setEncoding('utf8') | ||
|
||
let i = 1 | ||
setInterval(() => { | ||
client.write(JSON.stringify({ | ||
message: i | ||
})) | ||
i++ | ||
}, 1000).unref() | ||
const responses = {} | ||
|
||
client.on('data', message => { | ||
const data = JSON.parse(message) | ||
console.log('Received', data) | ||
responses[data.response] = responses[data.response] ? responses[data.response] + 1 : 1 | ||
}) | ||
|
||
client.on('error', error => { | ||
console.log('Error') | ||
console.error(error) | ||
}) | ||
|
||
client.on('close', () => { | ||
console.log('\n\n\nConnection closed') | ||
|
||
console.log('\n\n\nResponses') | ||
for (const key in responses) { | ||
if (!responses[key]) { | ||
console.log('missing', key) | ||
} else if (responses[key] !== 1) { | ||
console.log('extra messages', key, responses[key]) | ||
} | ||
} | ||
}) | ||
|
||
client.on('unexpected-response', (error) => { | ||
console.log('Unexpected response') | ||
console.error(error) | ||
}) | ||
|
||
client.on('redirect', (error) => { | ||
console.log('Redirect') | ||
console.error(error) | ||
}) | ||
|
||
client.on('upgrade', (error) => { | ||
console.log('Upgrade') | ||
console.error(error) | ||
}) | ||
|
||
client.on('ping', (error) => { | ||
console.log('Ping') | ||
console.error(error) | ||
}) | ||
|
||
client.on('pong', (error) => { | ||
console.log('Pong') | ||
console.error(error) | ||
}) | ||
|
||
process.on('SIGINT', () => { | ||
client.end() | ||
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"name": "client", | ||
"version": "1.0.0", | ||
"main": "index.js", | ||
"scripts": { | ||
"start": "node index.js", | ||
"dev": "node --watch index.js" | ||
}, | ||
"dependencies": { | ||
"ws": "^8.18.0" | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
'use strict' | ||
|
||
const { setTimeout: wait } = require('node:timers/promises') | ||
const fastify = require('fastify') | ||
const fastifyHttpProxy = require('../../../') | ||
|
||
async function main () { | ||
const port = process.env.PORT || 3001 | ||
|
||
const wsReconnect = { | ||
logs: true, | ||
pingInterval: 3_000, | ||
reconnectOnClose: true, | ||
} | ||
|
||
let backup = [] | ||
let lastPong = Date.now() | ||
|
||
// resend messages from last ping | ||
// it may send messages more than once | ||
// in case the target already received messages between last ping and the reconnection | ||
async function resendMessages (target) { | ||
const now = Date.now() | ||
|
||
for (const m of backup) { | ||
if (m.timestamp < lastPong || m.timestamp > now) { | ||
continue | ||
} | ||
console.log(' >>> resending message #', m) | ||
target.send(m.message) | ||
// introduce a small delay to avoid to flood the target | ||
await wait(250) | ||
} | ||
}; | ||
|
||
const wsHooks = { | ||
onPong: () => { | ||
console.log('onPong') | ||
lastPong = Date.now() | ||
// clean backup from the last ping | ||
backup = backup.filter(message => message.timestamp > lastPong) | ||
}, | ||
onIncomingMessage: (source, target, message) => { | ||
const m = message.data.toString() | ||
console.log('onIncomingMessage backup', m) | ||
backup.push({ message: m, timestamp: Date.now() }) | ||
}, | ||
onDisconnect: () => { | ||
console.log('onDisconnect') | ||
backup.length = 0 | ||
}, | ||
onReconnect: (source, target) => { | ||
console.log('onReconnect') | ||
resendMessages(target) | ||
}, | ||
} | ||
|
||
const proxy = fastify({ logger: true }) | ||
proxy.register(fastifyHttpProxy, { | ||
upstream: 'http://localhost:3000/', | ||
websocket: true, | ||
wsUpstream: 'ws://localhost:3000/', | ||
wsReconnect, | ||
wsHooks, | ||
}) | ||
|
||
await proxy.listen({ port }) | ||
} | ||
|
||
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"name": "proxy", | ||
"version": "1.0.0", | ||
"main": "index.js", | ||
"scripts": { | ||
"start": "node index.js", | ||
"dev": "node --watch index.js" | ||
}, | ||
"dependencies": { | ||
"fastify": "^5.2.1" | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
'use strict' | ||
|
||
const { setTimeout: wait } = require('node:timers/promises') | ||
const fastify = require('fastify') | ||
|
||
// unstable service | ||
|
||
async function main () { | ||
const SLOW_START = process.env.SLOW_START || 2_000 | ||
const UNSTABLE_MIN = process.env.UNSTABLE_MIN || 1_000 | ||
const UNSTABLE_MAX = process.env.UNSTABLE_MAX || 10_000 | ||
const BLOCK_TIME = process.env.BLOCK_TIME || 5_000 | ||
|
||
const app = fastify({ logger: true }) | ||
|
||
// slow start | ||
|
||
await wait(SLOW_START) | ||
|
||
app.register(require('@fastify/websocket')) | ||
app.register(async function (app) { | ||
app.get('/', { websocket: true }, (socket) => { | ||
socket.on('message', message => { | ||
let m = message.toString() | ||
console.log('incoming message', m) | ||
m = JSON.parse(m) | ||
|
||
socket.send(JSON.stringify({ | ||
response: m.message | ||
})) | ||
}) | ||
}) | ||
}) | ||
|
||
try { | ||
const port = process.env.PORT || 3000 | ||
await app.listen({ port }) | ||
} catch (err) { | ||
app.log.error(err) | ||
process.exit(1) | ||
} | ||
|
||
if (process.env.STABLE) { | ||
return | ||
} | ||
|
||
function runProblem () { | ||
const problem = process.env.PROBLEM || (Math.random() < 0.5 ? 'crash' : 'block') | ||
const unstabilityTimeout = process.env.UNSTABLE_TIMEOUT || Math.round(UNSTABLE_MIN + Math.random() * (UNSTABLE_MAX - UNSTABLE_MIN)) | ||
|
||
if (problem === 'crash') { | ||
console.log(`Restarting (crash and restart) in ${unstabilityTimeout}ms`) | ||
setTimeout(() => { | ||
console.log('UNHANDLED EXCEPTION') | ||
throw new Error('UNHANDLED EXCEPTION') | ||
}, unstabilityTimeout).unref() | ||
} else { | ||
console.log(`Blocking EL in ${unstabilityTimeout}ms for ${BLOCK_TIME}ms`) | ||
|
||
setTimeout(() => { | ||
console.log('Block EL ...') | ||
const start = performance.now() | ||
while (performance.now() - start < BLOCK_TIME) { | ||
// just block | ||
} | ||
console.log('Block ends') | ||
runProblem() | ||
}, unstabilityTimeout).unref() | ||
} | ||
} | ||
|
||
runProblem() | ||
} | ||
|
||
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"name": "unstable-target", | ||
"version": "1.0.0", | ||
"main": "index.js", | ||
"scripts": { | ||
"stable": "STABLE=1 node index.js", | ||
"unstable": "forever index.js", | ||
"dev": "node --watch index.js" | ||
}, | ||
"dependencies": { | ||
"fastify": "^5.2.1", | ||
"forever": "^4.0.3" | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.