Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/calm-adults-roll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tus/server": patch
"@tus/utils": patch
---

Consistent cancellation across streams and locks, fixing lock on file never being unlocked when the request ends prematurely.
34 changes: 19 additions & 15 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import EventEmitter from 'node:events'
import stream from 'node:stream/promises'
import {addAbortSignal, PassThrough} from 'node:stream'
import {PassThrough, Readable} from 'node:stream'
import type http from 'node:http'

import type {ServerOptions} from '../types'
Expand Down Expand Up @@ -121,15 +121,15 @@ export class BaseHandler extends EventEmitter {

const lock = locker.newLock(id)

await lock.lock(() => {
await lock.lock(context.signal, () => {
context.cancel()
})

return lock
}

protected writeToStore(
req: http.IncomingMessage,
data: Readable,
upload: Upload,
maxFileSize: number,
context: CancellationContext
Expand All @@ -145,16 +145,25 @@ export class BaseHandler extends EventEmitter {
// Create a PassThrough stream as a proxy to manage the request stream.
// This allows for aborting the write process without affecting the incoming request stream.
const proxy = new PassThrough()
addAbortSignal(context.signal, proxy)

// gracefully terminate the proxy stream when the request is aborted
const onAbort = () => {
data.unpipe(proxy)

if (!proxy.closed) {
proxy.end()
}
}
context.signal.addEventListener('abort', onAbort, {once: true})

proxy.on('error', (err) => {
req.unpipe(proxy)
data.unpipe(proxy)
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})

const postReceive = throttle(
(offset: number) => {
this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset})
this.emit(EVENTS.POST_RECEIVE_V2, data, {...upload, offset})
},
this.options.postReceiveInterval,
{leading: false}
Expand All @@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter {
postReceive(tempOffset)
})

req.on('error', () => {
if (!proxy.closed) {
// we end the stream gracefully here so that we can upload the remaining bytes to the store
// as an incompletePart
proxy.end()
}
})

// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
// which would result in a socket hangup error for the client.
stream
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
})
.then(resolve)
.catch(reject)
.finally(() => {
context.signal.removeEventListener('abort', onAbort)
})
})
}

Expand Down
15 changes: 9 additions & 6 deletions packages/server/src/lockers/MemoryLocker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ class MemoryLock implements Lock {
private timeout: number = 1000 * 30
) {}

async lock(requestRelease: RequestRelease): Promise<void> {
async lock(stopSignal: AbortSignal, requestRelease: RequestRelease): Promise<void> {
const abortController = new AbortController()

const abortSignal = AbortSignal.any([stopSignal, abortController.signal])

const lock = await Promise.race([
this.waitTimeout(abortController.signal),
this.acquireLock(this.id, requestRelease, abortController.signal),
this.waitTimeout(abortSignal),
this.acquireLock(this.id, requestRelease, abortSignal),
])

abortController.abort()
Expand All @@ -68,12 +71,12 @@ class MemoryLock implements Lock {
requestRelease: RequestRelease,
signal: AbortSignal
): Promise<boolean> {
const lock = this.locker.locks.get(id)

if (signal.aborted) {
return false
return typeof lock !== 'undefined'
}

const lock = this.locker.locks.get(id)

if (!lock) {
const lock = {
requestRelease,
Expand Down
16 changes: 16 additions & 0 deletions packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ export class Server extends EventEmitter {
): Promise<http.ServerResponse | stream.Writable | void> {
const context = this.createContext(req)

// Once the request is closed we abort the context to clean up underline resources
req.on('close', () => {
context.abort()
})

log(`[TusServer] handle: ${req.method} ${req.url}`)
// Allow overriding the HTTP method. The reason for this is
// that some libraries/environments to not support PATCH and
Expand Down Expand Up @@ -289,6 +294,17 @@ export class Server extends EventEmitter {

res.writeHead(status, headers)
res.write(body)

// Abort the context once the response is sent.
// Useful for clean-up when the server uses keep-alive
if (!isAborted) {
res.on('finish', () => {
if (!req.closed) {
context.abort()
}
})
}

return res.end()
}

Expand Down
51 changes: 45 additions & 6 deletions packages/server/test/Locker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ describe('MemoryLocker', () => {
it('will acquire a lock by notifying another to release it', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const cancel = sinon.spy()
const cancel2 = sinon.spy()

const lock1 = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

await lock1.lock(async () => {
await lock1.lock(abortController.signal, async () => {
await lock1.unlock()
cancel()
})

await lock2.lock(async () => {
await lock2.lock(abortController.signal, async () => {
cancel2()
})

Expand All @@ -32,19 +33,21 @@ describe('MemoryLocker', () => {
const locker = new MemoryLocker({
acquireLockTimeout: 500,
})
const abortController = new AbortController()

const lockId = 'upload-id-1'
const lock = locker.newLock(lockId)

const cancel = sinon.spy()

await lock.lock(async () => {
await lock.lock(abortController.signal, async () => {
cancel()
// We note that the function has been called, but do not
// release the lock
})

try {
await lock.lock(async () => {
await lock.lock(abortController.signal, async () => {
throw new Error('panic should not be called')
})
} catch (e) {
Expand All @@ -57,18 +60,20 @@ describe('MemoryLocker', () => {
it('request lock and unlock', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const lock = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

const cancel = sinon.spy()
await lock.lock(() => {
await lock.lock(abortController.signal, () => {
cancel()
setTimeout(async () => {
await lock.unlock()
}, 50)
})

await lock2.lock(() => {
await lock2.lock(abortController.signal, () => {
throw new Error('should not be called')
})

Expand All @@ -79,4 +84,38 @@ describe('MemoryLocker', () => {
`request released called more times than expected - ${cancel.callCount}`
)
})

it('will stop trying to acquire the lock if the abort signal is aborted', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'
const abortController = new AbortController()

const cancel = sinon.spy()
const cancel2 = sinon.spy()

const lock1 = locker.newLock(lockId)
const lock2 = locker.newLock(lockId)

await lock1.lock(abortController.signal, async () => {
// do not unlock when requested
cancel()
})

// Abort signal is aborted after lock2 tries to acquire the lock
setTimeout(() => {
abortController.abort()
}, 100)

try {
await lock2.lock(abortController.signal, async () => {
cancel2()
})
assert(false, 'lock2 should not have been acquired')
} catch (e) {
assert(e === ERRORS.ERR_LOCK_TIMEOUT, `error returned is not correct ${e}`)
}

assert(cancel.callCount > 1, `calls count dont match ${cancel.callCount} !== 1`)
assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`)
})
})
67 changes: 66 additions & 1 deletion packages/server/test/PatchHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {EventEmitter} from 'node:events'
import {addPipableStreamBody} from './utils'
import {MemoryLocker} from '../src'
import streamP from 'node:stream/promises'
import stream from 'node:stream'
import stream, {PassThrough} from 'node:stream'

describe('PatchHandler', () => {
const path = '/test/output'
Expand Down Expand Up @@ -245,4 +245,69 @@ describe('PatchHandler', () => {
assert.equal(context.signal.aborted, true)
}
})

it('should gracefully terminate request stream when context is cancelled', async () => {
handler = new PatchHandler(store, {path, locker: new MemoryLocker()})

const bodyStream = new PassThrough() // 20kb buffer
const req = addPipableStreamBody(
httpMocks.createRequest({
method: 'PATCH',
url: `${path}/1234`,
body: bodyStream,
})
)

const abortController = new AbortController()
context = {
cancel: () => abortController.abort(),
abort: () => abortController.abort(),
signal: abortController.signal,
}

const res = httpMocks.createResponse({req})
req.headers = {
'upload-offset': '0',
'content-type': 'application/offset+octet-stream',
}
req.url = `${path}/file`

let accumulatedBuffer: Buffer = Buffer.alloc(0)

store.getUpload.resolves(new Upload({id: '1234', offset: 0}))
store.write.callsFake(async (readable: http.IncomingMessage | stream.Readable) => {
const writeStream = new stream.PassThrough()
const chunks: Buffer[] = []

writeStream.on('data', (chunk) => {
chunks.push(chunk) // Accumulate chunks in the outer buffer
})

await streamP.pipeline(readable, writeStream)

accumulatedBuffer = Buffer.concat([accumulatedBuffer, ...chunks])

return writeStream.readableLength
})
store.declareUploadLength.resolves()

await new Promise((resolve, reject) => {
handler.send(req, res, context).then(resolve).catch(reject)

// sends the first 20kb
bodyStream.write(Buffer.alloc(1024 * 20))

// write 15kb
bodyStream.write(Buffer.alloc(1024 * 15))

// simulate that the request was cancelled
setTimeout(() => {
context.abort()
}, 200)
})

// We expect that all the data was written to the store, 35kb
assert.equal(accumulatedBuffer.byteLength, 35 * 1024)
bodyStream.end()
})
})
33 changes: 26 additions & 7 deletions packages/server/test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type httpMocks from 'node-mocks-http'
import stream from 'node:stream'
import stream, {Readable, Transform, TransformCallback} from 'node:stream'
import type http from 'node:http'

export function addPipableStreamBody<
Expand All @@ -8,21 +8,40 @@ export function addPipableStreamBody<
// Create a Readable stream that simulates the request body
const bodyStream = new stream.Duplex({
read() {
this.push(
mockRequest.body instanceof Buffer
? mockRequest.body
: JSON.stringify(mockRequest.body)
)
this.push(null)
// This function is intentionally left empty since the data flow
// is controlled by event listeners registered outside of this method.
},
})

// Handle cases where the body is a Readable stream
if (mockRequest.body instanceof Readable) {
// Pipe the mockRequest.body to the bodyStream
mockRequest.body.on('data', (chunk) => {
bodyStream.push(chunk) // Push the chunk to the bodyStream
})

mockRequest.body.on('end', () => {
bodyStream.push(null) // Signal the end of the stream
})
} else {
// Handle cases where the body is not a stream (e.g., Buffer or plain object)
const bodyBuffer =
mockRequest.body instanceof Buffer
? mockRequest.body
: Buffer.from(JSON.stringify(mockRequest.body))

// Push the bodyBuffer and signal the end of the stream
bodyStream.push(bodyBuffer)
bodyStream.push(null)
}

// Add the pipe method to the mockRequest
// @ts-ignore
mockRequest.pipe = (dest: stream.Writable) => bodyStream.pipe(dest)

// Add the unpipe method to the mockRequest
// @ts-ignore
mockRequest.unpipe = (dest: stream.Writable) => bodyStream.unpipe(dest)

return mockRequest
}
Loading
Loading