diff --git a/.changeset/twenty-crews-punch.md b/.changeset/twenty-crews-punch.md new file mode 100644 index 00000000..d16faec3 --- /dev/null +++ b/.changeset/twenty-crews-punch.md @@ -0,0 +1,5 @@ +--- +"@tus/s3-store": patch +--- + +Fix unhandled promise rejection when uploading a part fails, in which case we returned too early, leaving other parts running in the background. diff --git a/packages/s3-store/package.json b/packages/s3-store/package.json index e913379a..c58be208 100644 --- a/packages/s3-store/package.json +++ b/packages/s3-store/package.json @@ -16,6 +16,7 @@ ], "scripts": { "build": "tsc --build", + "pretest": "tsc --build", "test": "mocha --timeout 40000 --exit --extension ts --require ts-node/register" }, "dependencies": { diff --git a/packages/s3-store/src/index.ts b/packages/s3-store/src/index.ts index 40e1367c..5009c602 100644 --- a/packages/s3-store/src/index.ts +++ b/packages/s3-store/src/index.ts @@ -409,19 +409,25 @@ export class S3Store extends DataStore { bytesUploaded += partSize resolve() } catch (error) { + // If there's an error, destroy the splitter to stop processing more chunks + splitterStream.destroy(error) reject(error) } finally { fsProm.rm(path).catch(() => { /* ignore */ }) - acquiredPermit?.release() + acquiredPermit?.release().catch(() => { + /* ignore */ + }) } }) promises.push(deferred) }) .on('chunkError', () => { - permit?.release() + permit?.release().catch(() => { + /* ignore */ + }) }) try { @@ -437,6 +443,10 @@ export class S3Store extends DataStore { promises.push(Promise.reject(error)) } finally { + // Wait for all promises. We don't want to return + // early have have promises running in the background. + await Promise.allSettled(promises) + // But we do want to reject the promise if any of the promises reject. await Promise.all(promises) } diff --git a/packages/s3-store/test/index.ts b/packages/s3-store/test/index.ts index 4a8e31af..e9042100 100644 --- a/packages/s3-store/test/index.ts +++ b/packages/s3-store/test/index.ts @@ -278,14 +278,14 @@ describe('S3DataStore', () => { } }) - it('should use default maxMultipartParts when not specified', function () { + it('should use default maxMultipartParts when not specified', () => { const store = new S3Store({ s3ClientConfig, }) assert.equal(store.maxMultipartParts, 10000) }) - it('should use custom maxMultipartParts when specified', function () { + it('should use custom maxMultipartParts when specified', () => { const customMaxParts = 5000 const store = new S3Store({ s3ClientConfig, @@ -294,6 +294,38 @@ describe('S3DataStore', () => { assert.equal(store.maxMultipartParts, customMaxParts) }) + it('should handle multiple concurrent part uploads with some failures', async () => { + const store = new S3Store({ + partSize: 5 * 1024 * 1024, + maxConcurrentPartUploads: 3, // Allow 3 concurrent uploads + s3ClientConfig, + }) + + const size = 15 * 1024 * 1024 // 15MB will be split into 3 parts + const upload = new Upload({ + id: shared.testId('concurrent-failures'), + size, + offset: 0, + }) + + // @ts-expect-error private method + const uploadPartStub = sinon.stub(store, 'uploadPart') + + // Make the second part upload fail + uploadPartStub.onCall(0).resolves('etag1') + uploadPartStub.onCall(1).rejects(new Error('Part 2 upload failed')) + uploadPartStub.onCall(2).resolves('etag3') + + await store.create(upload) + + try { + await store.write(Readable.from(Buffer.alloc(size)), upload.id, upload.offset) + assert.fail('Expected write to fail but it succeeded') + } catch (error) { + assert.equal(uploadPartStub.callCount, 2, '2 parts should have been attempted') + } + }) + shared.shouldHaveStoreMethods() shared.shouldCreateUploads() shared.shouldRemoveUploads() // Termination extension