Skip to content

feat: websocket reconnect #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,37 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio
opening the WebSocket connection. This function should return an object with the given headers.
The default implementation forwards the `cookie` header.

## `wsReconnect`

**Experimental.** (default: `disabled`)

Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections).
The connection is considered broken if the target does not respond to the ping messages or no data is received from the target.

The `wsReconnect` option contains the configuration for the WebSocket reconnection feature.
To enable the feature, set the `wsReconnect` option to an object with the following properties:

- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`).
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`).
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
- `logs`: Whether to log the reconnection process (default: `false`).

See the example in [examples/reconnection](examples/reconnection).

## wsHooks

On websocket events, the following hooks are available, note **the hooks are all synchronous**.

- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(source, target, { data, binary })` (default: `undefined`).
- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(source, target, { data, binary })` (default: `undefined`).
- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`).
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`).
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.

## Benchmarks

The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM:
Expand Down
60 changes: 60 additions & 0 deletions examples/reconnection/ReconnectionExample.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Reconnection Example

This example demonstrates how to use the reconnection feature of the proxy.

It simulates an unstable target service: slow to start, unresponsive due to block of the event loop, crash and restart.

The goal is to ensures a more resilient and customizable integration, minimizing disruptions caused by connection instability.


## How to run

Run the unstable target

```
cd examples/reconnection/unstable-target
npm run unstable
```

Run the proxy

```
cd examples/reconnection/proxy
npm run start
```

Then run the client

```
cd examples/reconnection/client
npm run start
```

---

## How it works

### Proxy Connection Monitoring and Recovery

The proxy monitors the target connection using a ping/pong mechanism. If a pong response does not arrive on time, the connection is closed, and the proxy attempts to reconnect.

If the target service crashes, the connection may close either gracefully or abruptly. Regardless of how the disconnection occurs, the proxy detects the connection loss and initiates a reconnection attempt.

### Connection Stability

- The connection between the client and the proxy remains unaffected by an unstable target.
- The connection between the proxy and the target may be closed due to:
- The target failing to respond to ping messages, even if the connection is still technically open (e.g., due to a freeze or blockage).
- The target crashing and restarting.

### Handling Data Loss During Reconnection

The proxy supports hooks to manage potential data loss during reconnection. These hooks allow for custom logic to ensure message integrity when resending data from the client to the target.

Examples of how hooks can be used based on the target service type:

- GraphQL subscriptions: Resend the subscription from the last received message.
- Message brokers: Resend messages starting from the last successfully processed message.

In this example, the proxy re-sends the messages from the last ping to ensure all the messages are sent to the target, without any additional logic.
Resending messages from the last pong ensures that the target does not miss any messages, but it may send messages more than once.
75 changes: 75 additions & 0 deletions examples/reconnection/client/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict'

const WebSocket = require('ws')

const port = process.env.PORT || 3001

// connect to proxy

const url = `ws://localhost:${port}/`
const ws = new WebSocket(url)
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })

client.setEncoding('utf8')

let i = 1
setInterval(() => {
client.write(JSON.stringify({
message: i
}))
i++
}, 1000).unref()
const responses = {}

client.on('data', message => {
const data = JSON.parse(message)
console.log('Received', data)
responses[data.response] = responses[data.response] ? responses[data.response] + 1 : 1
})

client.on('error', error => {
console.log('Error')
console.error(error)
})

client.on('close', () => {
console.log('\n\n\nConnection closed')

console.log('\n\n\nResponses')
for (const key in responses) {
if (!responses[key]) {
console.log('missing', key)
} else if (responses[key] !== 1) {
console.log('extra messages', key, responses[key])
}
}
})

client.on('unexpected-response', (error) => {
console.log('Unexpected response')
console.error(error)
})

client.on('redirect', (error) => {
console.log('Redirect')
console.error(error)
})

client.on('upgrade', (error) => {
console.log('Upgrade')
console.error(error)
})

client.on('ping', (error) => {
console.log('Ping')
console.error(error)
})

client.on('pong', (error) => {
console.log('Pong')
console.error(error)
})

process.on('SIGINT', () => {
client.end()
})
12 changes: 12 additions & 0 deletions examples/reconnection/client/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "client",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"start": "node index.js",
"dev": "node --watch index.js"
},
"dependencies": {
"ws": "^8.18.0"
}
}
70 changes: 70 additions & 0 deletions examples/reconnection/proxy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict'

const { setTimeout: wait } = require('node:timers/promises')
const fastify = require('fastify')
const fastifyHttpProxy = require('../../../')

async function main () {
const port = process.env.PORT || 3001

const wsReconnect = {
logs: true,
pingInterval: 3_000,
reconnectOnClose: true,
}

let backup = []
let lastPong = Date.now()

// resend messages from last ping
// it may send messages more than once
// in case the target already received messages between last ping and the reconnection
async function resendMessages (target) {
const now = Date.now()

for (const m of backup) {
if (m.timestamp < lastPong || m.timestamp > now) {
continue
}
console.log(' >>> resending message #', m)
target.send(m.message)
// introduce a small delay to avoid to flood the target
await wait(250)
}
};

const wsHooks = {
onPong: () => {
console.log('onPong')
lastPong = Date.now()
// clean backup from the last ping
backup = backup.filter(message => message.timestamp > lastPong)
},
onIncomingMessage: (source, target, message) => {
const m = message.data.toString()
console.log('onIncomingMessage backup', m)
backup.push({ message: m, timestamp: Date.now() })
},
onDisconnect: () => {
console.log('onDisconnect')
backup.length = 0
},
onReconnect: (source, target) => {
console.log('onReconnect')
resendMessages(target)
},
}

const proxy = fastify({ logger: true })
proxy.register(fastifyHttpProxy, {
upstream: 'http://localhost:3000/',
websocket: true,
wsUpstream: 'ws://localhost:3000/',
wsReconnect,
wsHooks,
})

await proxy.listen({ port })
}

main()
12 changes: 12 additions & 0 deletions examples/reconnection/proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "proxy",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"start": "node index.js",
"dev": "node --watch index.js"
},
"dependencies": {
"fastify": "^5.2.1"
}
}
75 changes: 75 additions & 0 deletions examples/reconnection/unstable-target/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict'

const { setTimeout: wait } = require('node:timers/promises')
const fastify = require('fastify')

// unstable service

async function main () {
const SLOW_START = process.env.SLOW_START || 2_000
const UNSTABLE_MIN = process.env.UNSTABLE_MIN || 1_000
const UNSTABLE_MAX = process.env.UNSTABLE_MAX || 10_000
const BLOCK_TIME = process.env.BLOCK_TIME || 5_000

const app = fastify({ logger: true })

// slow start

await wait(SLOW_START)

app.register(require('@fastify/websocket'))
app.register(async function (app) {
app.get('/', { websocket: true }, (socket) => {
socket.on('message', message => {
let m = message.toString()
console.log('incoming message', m)
m = JSON.parse(m)

socket.send(JSON.stringify({
response: m.message
}))
})
})
})

try {
const port = process.env.PORT || 3000
await app.listen({ port })
} catch (err) {
app.log.error(err)
process.exit(1)
}

if (process.env.STABLE) {
return
}

function runProblem () {
const problem = process.env.PROBLEM || (Math.random() < 0.5 ? 'crash' : 'block')
const unstabilityTimeout = process.env.UNSTABLE_TIMEOUT || Math.round(UNSTABLE_MIN + Math.random() * (UNSTABLE_MAX - UNSTABLE_MIN))

if (problem === 'crash') {
console.log(`Restarting (crash and restart) in ${unstabilityTimeout}ms`)
setTimeout(() => {
console.log('UNHANDLED EXCEPTION')
throw new Error('UNHANDLED EXCEPTION')
}, unstabilityTimeout).unref()
} else {
console.log(`Blocking EL in ${unstabilityTimeout}ms for ${BLOCK_TIME}ms`)

setTimeout(() => {
console.log('Block EL ...')
const start = performance.now()
while (performance.now() - start < BLOCK_TIME) {
// just block
}
console.log('Block ends')
runProblem()
}, unstabilityTimeout).unref()
}
}

runProblem()
}

main()
14 changes: 14 additions & 0 deletions examples/reconnection/unstable-target/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "unstable-target",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"stable": "STABLE=1 node index.js",
"unstable": "forever index.js",
"dev": "node --watch index.js"
},
"dependencies": {
"fastify": "^5.2.1",
"forever": "^4.0.3"
}
}
Loading
Loading