Skip to content

Commit d6c6c3a

Browse files
authored
Abort HTTP connection when stopping changesReader (#310)
1 parent d7617d6 commit d6c6c3a

File tree

6 files changed

+62
-2
lines changed

6 files changed

+62
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ reliable, resumable changes feed follower, then you need the `changesReader`.
650650
651651
There are three ways to start listening to the changes feed:
652652
653-
1. `changesReader.start()` - to listen to changes indefinitely by repeated "long poll" requests. This mode continues to poll for changes forever.
653+
1. `changesReader.start()` - to listen to changes indefinitely by repeated "long poll" requests. This mode continues to poll for changes until `changesReader.stop()` is called, at which point any active long poll will be canceled.
654654
2. `changesReader.get()` - to listen to changes until the end of the changes feed is reached, by repeated "long poll" requests. Once a response with zero changes is received, the 'end' event will indicate the end of the changes and polling will stop.
655655
3. `changesReader.spool()` - listen to changes in one long HTTP request. (as opposed to repeated round trips) - spool is faster but less reliable.
656656

lib/changesreader.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const EventEmitter = require('events').EventEmitter
2+
const AbortController = global.AbortController || require('node-abort-controller').AbortController
23
const stream = require('stream')
34
const EVENT_BATCH = 'batch'
45
const EVENT_CHANGE = 'change'
@@ -103,6 +104,7 @@ class ChangesReader {
103104
this.qs = {} // extra querystring parameters
104105
this.selector = null
105106
this.paused = false
107+
this.abortController = null
106108
}
107109

108110
pause () {
@@ -116,6 +118,9 @@ class ChangesReader {
116118
// prevent another poll happening
117119
stop () {
118120
this.continue = false
121+
if (this.abortController) {
122+
this.abortController.abort()
123+
}
119124
}
120125

121126
// sleep, promise style
@@ -148,11 +153,13 @@ class ChangesReader {
148153
const work = async () => {
149154
do {
150155
if (!self.paused) {
156+
self.abortController = new AbortController()
151157
// formulate changes feed longpoll HTTP request
152158
const req = {
153159
method: 'post',
154160
db: self.db,
155161
path: '_changes',
162+
signal: self.abortController.signal,
156163
qs: {
157164
feed: 'longpoll',
158165
timeout: self.timeout,
@@ -174,6 +181,7 @@ class ChangesReader {
174181
// make HTTP request to get up to batchSize changes from the feed
175182
try {
176183
const data = await self.request(req)
184+
self.abortController = null
177185
delay = 0
178186

179187
// update the since state

lib/nano.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,16 @@ module.exports = exports = function dbScope (cfg) {
129129
statusCode: statusCode
130130
}, response.headers)
131131
if (!response.status) {
132+
if (axios.isCancel(response)) {
133+
if (resolve) {
134+
resolve('canceled')
135+
}
136+
if (callback) {
137+
callback(null, 'canceled', responseHeaders)
138+
}
139+
return
140+
}
141+
132142
log({ err: 'socket', body: body, headers: responseHeaders })
133143
if (reject) {
134144
reject(new Error(`error happened in your connection. Reason: ${response.message}`))
@@ -267,6 +277,10 @@ module.exports = exports = function dbScope (cfg) {
267277
// https://github.com/mikeal/request#requestjar
268278
const isJar = opts.jar || cfg.jar || (cfg.requestDefaults && cfg.requestDefaults.jar)
269279

280+
if (opts.signal) {
281+
req.signal = opts.signal
282+
}
283+
270284
if (isJar) {
271285
req.jar = cookieJar
272286
req.withCredentials = true

package-lock.json

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"http-cookie-agent": "^1.0.5",
2121
"@types/tough-cookie": "^4.0.0",
2222
"axios": "^0.26.1",
23+
"node-abort-controller": "^3.0.1",
2324
"qs": "^6.10.3",
2425
"tough-cookie": "^4.0.0"
2526
},
@@ -48,4 +49,4 @@
4849
"jest"
4950
]
5051
}
51-
}
52+
}

test/document.changesreader.test.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,3 +505,29 @@ test('should survive malformed JSON - db.changesReader.start', async () => {
505505
})
506506
})
507507
}, 10000)
508+
509+
test('should cancel HTTP connection as soon as stop is called', async () => {
510+
const changeURL = `/${DBNAME}/_changes`
511+
nock(COUCH_URL)
512+
.post(changeURL)
513+
.query({ feed: 'longpoll', timeout: 60000, since: 'now', limit: 100, include_docs: false })
514+
.reply(200, { results: [], last_seq: '1-0', pending: 0 })
515+
.post(changeURL)
516+
.query({ feed: 'longpoll', timeout: 60000, since: '1-0', limit: 100, include_docs: false })
517+
.delay(60000)
518+
.reply(500)
519+
const db = nano.db.use(DBNAME)
520+
const cr = db.changesReader.start()
521+
await new Promise((resolve, reject) => {
522+
cr.on('seq', function (seq) {
523+
setTimeout(function () {
524+
// give the next http connection a chance to be established
525+
db.changesReader.stop()
526+
}, 200)
527+
})
528+
529+
cr.on('end', function () {
530+
resolve()
531+
})
532+
})
533+
})

0 commit comments

Comments
 (0)