Skip to content

Commit cee1a57

Browse files
committed
Improve connection queue handling + fix leak
1 parent b5ceecc commit cee1a57

File tree

2 files changed

+41
-50
lines changed

2 files changed

+41
-50
lines changed

src/connection.js

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ const errorFields = {
4848
82 : 'routine' // R
4949
}
5050

51-
function Connection(options, { onopen = noop, onend = noop, ondrain = noop, onclose = noop } = {}) {
51+
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) {
5252
const {
5353
ssl,
5454
max,
@@ -80,7 +80,6 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
8080
, needsTypes = options.fetch_types
8181
, backendParameters = {}
8282
, statements = {}
83-
, state = 'closed'
8483
, statementId = Math.random().toString(36).slice(2)
8584
, statementCount = 1
8685
, closedDate = 0
@@ -105,13 +104,8 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
105104
, final = null
106105

107106
const connection = {
108-
get state() { return state },
109-
set state(x) {
110-
state = x
111-
state === 'open'
112-
? idleTimer.start()
113-
: idleTimer.cancel()
114-
},
107+
queue: queues.closed,
108+
idleTimer,
115109
connect(query) {
116110
initial = query
117111
reconnect()
@@ -124,6 +118,8 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
124118
id
125119
}
126120

121+
queues.closed && queues.closed.push(connection)
122+
127123
return connection
128124

129125
function createSocket() {
@@ -291,7 +287,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
291287

292288
/* c8 ignore next 3 */
293289
function drain() {
294-
ondrain(connection)
290+
onopen(connection)
295291
}
296292

297293
function data(x) {
@@ -362,7 +358,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
362358
}
363359

364360
function error(err) {
365-
if (connection.state === 'connecting' && options.host[retries + 1])
361+
if (connection.queue === queues.connecting && options.host[retries + 1])
366362
return
367363

368364
errored(err)
@@ -529,7 +525,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
529525
}
530526

531527
while (sent.length && (query = sent.shift()) && (query.active = true) && query.cancelled)
532-
Connection(options, {}).cancel(query.state, query.cancelled.resolve, query.cancelled.reject)
528+
Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject)
533529

534530
if (query)
535531
return // Consider opening if able and sent.length < 50

src/index.js

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ function Postgres(a, b) {
4242
let ending = false
4343

4444
const queries = Queue()
45-
, connections = [...Array(options.max)].map(() => Connection(options, { onopen, onend, ondrain, onclose }))
46-
, closed = Queue(connections)
45+
, connecting = Queue()
4746
, reserved = Queue()
47+
, closed = Queue()
48+
, ended = Queue()
4849
, open = Queue()
4950
, busy = Queue()
5051
, full = Queue()
51-
, ended = Queue()
52-
, connecting = Queue()
53-
, queues = { closed, ended, connecting, reserved, open, busy, full }
52+
, queues = { connecting, reserved, closed, ended, open, busy, full }
53+
54+
const connections = [...Array(options.max)].map(() => Connection(options, queues, { onopen, onend, onclose }))
5455

5556
const sql = Sql(handler)
5657

@@ -229,23 +230,30 @@ function Postgres(a, b) {
229230

230231
function handler(q) {
231232
q.catch(e => uncaughtError || (uncaughtError = e))
232-
c.state === 'full'
233+
c.queue === full
233234
? queries.push(q)
234-
: c.execute(q) || (c.state = 'full', full.push(c))
235+
: c.execute(q) || move(c, full)
235236
}
236237
}
237238

238239
function onexecute(c) {
239-
queues[c.state].remove(c)
240-
c.state = 'reserved'
240+
connection = c
241+
move(c, reserved)
241242
c.reserved = () => queries.length
242243
? c.execute(queries.shift())
243-
: c.state = 'reserved'
244-
reserved.push(c)
245-
connection = c
244+
: move(c, reserved)
246245
}
247246
}
248247

248+
function move(c, queue) {
249+
c.queue.remove(c)
250+
queue.push(c)
251+
c.queue = queue
252+
queue === open
253+
? c.idleTimer.start()
254+
: c.idleTimer.cancel()
255+
}
256+
249257
function json(x) {
250258
return new Parameter(x, 3802)
251259
}
@@ -262,28 +270,27 @@ function Postgres(a, b) {
262270
return query.reject(Errors.connection('CONNECTION_ENDED', options, options))
263271

264272
if (open.length)
265-
return go(open, query)
273+
return go(open.shift(), query)
266274

267275
if (closed.length)
268276
return connect(closed.shift(), query)
269277

270278
busy.length
271-
? go(busy, query)
279+
? go(busy.shift(), query)
272280
: queries.push(query)
273281
}
274282

275-
function go(xs, query) {
276-
const c = xs.shift()
283+
function go(c, query) {
277284
return c.execute(query)
278-
? (c.state = 'busy', busy.push(c))
279-
: (c.state = 'full', full.push(c))
285+
? move(c, busy)
286+
: move(c, full)
280287
}
281288

282289
function cancel(query) {
283290
return new Promise((resolve, reject) => {
284291
query.state
285292
? query.active
286-
? Connection(options, {}).cancel(query.state, resolve, reject)
293+
? Connection(options).cancel(query.state, resolve, reject)
287294
: query.cancelled = { resolve, reject }
288295
: (
289296
queries.remove(query),
@@ -317,21 +324,17 @@ function Postgres(a, b) {
317324
}
318325

319326
function connect(c, query) {
320-
c.state = 'connecting'
321-
connecting.push(c)
327+
move(c, connecting)
322328
c.connect(query)
323329
}
324330

325331
function onend(c) {
326-
queues[c.state].remove(c)
327-
c.state = 'ended'
328-
ended.push(c)
332+
move(c, ended)
329333
}
330334

331335
function onopen(c) {
332-
queues[c.state].remove(c)
333336
if (queries.length === 0)
334-
return (c.state = 'open', open.push(c))
337+
return move(c, open)
335338

336339
let max = Math.ceil(queries.length / (connecting.length + 1))
337340
, ready = true
@@ -340,23 +343,15 @@ function Postgres(a, b) {
340343
ready = c.execute(queries.shift())
341344

342345
ready
343-
? (c.state = 'busy', busy.push(c))
344-
: (c.state = 'full', full.push(c))
345-
}
346-
347-
function ondrain(c) {
348-
full.remove(c)
349-
onopen(c)
346+
? move(c, busy)
347+
: move(c, full)
350348
}
351349

352350
function onclose(c) {
353-
queues[c.state].remove(c)
354-
c.state = 'closed'
351+
move(c, closed)
355352
c.reserved = null
356353
options.onclose && options.onclose(c.id)
357-
queries.length
358-
? connect(c, queries.shift())
359-
: queues.closed.push(c)
354+
queries.length && connect(c, queries.shift())
360355
}
361356
}
362357

0 commit comments

Comments
 (0)