Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twenty-crews-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tus/s3-store": patch
---

Fix unhandled promise rejection when uploading a part fails, in which case we returned too early, leaving other parts running in the background.
1 change: 1 addition & 0 deletions packages/s3-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
],
"scripts": {
"build": "tsc --build",
"pretest": "tsc --build",
"test": "mocha --timeout 40000 --exit --extension ts --require ts-node/register"
},
"dependencies": {
Expand Down
14 changes: 12 additions & 2 deletions packages/s3-store/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,19 +409,25 @@ export class S3Store extends DataStore {
bytesUploaded += partSize
resolve()
} catch (error) {
// If there's an error, destroy the splitter to stop processing more chunks
splitterStream.destroy(error)
reject(error)
} finally {
fsProm.rm(path).catch(() => {
/* ignore */
})
acquiredPermit?.release()
acquiredPermit?.release().catch(() => {
/* ignore */
})
}
})

promises.push(deferred)
})
.on('chunkError', () => {
permit?.release()
permit?.release().catch(() => {
/* ignore */
})
})

try {
Expand All @@ -437,6 +443,10 @@ export class S3Store extends DataStore {

promises.push(Promise.reject(error))
} finally {
// Wait for all promises. We don't want to return
// early have have promises running in the background.
await Promise.allSettled(promises)
// But we do want to reject the promise if any of the promises reject.
await Promise.all(promises)
}

Expand Down
36 changes: 34 additions & 2 deletions packages/s3-store/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ describe('S3DataStore', () => {
}
})

it('should use default maxMultipartParts when not specified', function () {
it('should use default maxMultipartParts when not specified', () => {
const store = new S3Store({
s3ClientConfig,
})
assert.equal(store.maxMultipartParts, 10000)
})

it('should use custom maxMultipartParts when specified', function () {
it('should use custom maxMultipartParts when specified', () => {
const customMaxParts = 5000
const store = new S3Store({
s3ClientConfig,
Expand All @@ -294,6 +294,38 @@ describe('S3DataStore', () => {
assert.equal(store.maxMultipartParts, customMaxParts)
})

it('should handle multiple concurrent part uploads with some failures', async () => {
const store = new S3Store({
partSize: 5 * 1024 * 1024,
maxConcurrentPartUploads: 3, // Allow 3 concurrent uploads
s3ClientConfig,
})

const size = 15 * 1024 * 1024 // 15MB will be split into 3 parts
const upload = new Upload({
id: shared.testId('concurrent-failures'),
size,
offset: 0,
})

// @ts-expect-error private method
const uploadPartStub = sinon.stub(store, 'uploadPart')

// Make the second part upload fail
uploadPartStub.onCall(0).resolves('etag1')
uploadPartStub.onCall(1).rejects(new Error('Part 2 upload failed'))
uploadPartStub.onCall(2).resolves('etag3')

await store.create(upload)

try {
await store.write(Readable.from(Buffer.alloc(size)), upload.id, upload.offset)
assert.fail('Expected write to fail but it succeeded')
} catch (error) {
assert.equal(uploadPartStub.callCount, 2, '2 parts should have been attempted')
}
})

shared.shouldHaveStoreMethods()
shared.shouldCreateUploads()
shared.shouldRemoveUploads() // Termination extension
Expand Down