From 42e2ef05c29a996af3a362fd386fbe72c030d4a3 Mon Sep 17 00:00:00 2001 From: Murderlon Date: Tue, 25 Feb 2025 14:47:00 +0100 Subject: [PATCH 1/3] @tus/s3-store: fix unhandled promise rejection --- packages/s3-store/package.json | 1 + packages/s3-store/src/index.ts | 12 ++++++++--- packages/s3-store/test/index.ts | 36 +++++++++++++++++++++++++++++++-- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/packages/s3-store/package.json b/packages/s3-store/package.json index e913379a..c58be208 100644 --- a/packages/s3-store/package.json +++ b/packages/s3-store/package.json @@ -16,6 +16,7 @@ ], "scripts": { "build": "tsc --build", + "pretest": "tsc --build", "test": "mocha --timeout 40000 --exit --extension ts --require ts-node/register" }, "dependencies": { diff --git a/packages/s3-store/src/index.ts b/packages/s3-store/src/index.ts index 40e1367c..d261a09d 100644 --- a/packages/s3-store/src/index.ts +++ b/packages/s3-store/src/index.ts @@ -409,19 +409,25 @@ export class S3Store extends DataStore { bytesUploaded += partSize resolve() } catch (error) { + // If there's an error, destroy the splitter to stop processing more chunks + splitterStream.destroy(error) reject(error) } finally { fsProm.rm(path).catch(() => { /* ignore */ }) - acquiredPermit?.release() + acquiredPermit?.release().catch(() => { + /* ignore */ + }) } }) promises.push(deferred) }) .on('chunkError', () => { - permit?.release() + permit?.release().catch(() => { + /* ignore */ + }) }) try { @@ -437,7 +443,7 @@ export class S3Store extends DataStore { promises.push(Promise.reject(error)) } finally { - await Promise.all(promises) + await Promise.allSettled(promises) } return bytesUploaded diff --git a/packages/s3-store/test/index.ts b/packages/s3-store/test/index.ts index 4a8e31af..e9042100 100644 --- a/packages/s3-store/test/index.ts +++ b/packages/s3-store/test/index.ts @@ -278,14 +278,14 @@ describe('S3DataStore', () => { } }) - it('should use default maxMultipartParts when not specified', function () { + it('should use default maxMultipartParts when not specified', () => { const store = new S3Store({ s3ClientConfig, }) assert.equal(store.maxMultipartParts, 10000) }) - it('should use custom maxMultipartParts when specified', function () { + it('should use custom maxMultipartParts when specified', () => { const customMaxParts = 5000 const store = new S3Store({ s3ClientConfig, @@ -294,6 +294,38 @@ describe('S3DataStore', () => { assert.equal(store.maxMultipartParts, customMaxParts) }) + it('should handle multiple concurrent part uploads with some failures', async () => { + const store = new S3Store({ + partSize: 5 * 1024 * 1024, + maxConcurrentPartUploads: 3, // Allow 3 concurrent uploads + s3ClientConfig, + }) + + const size = 15 * 1024 * 1024 // 15MB will be split into 3 parts + const upload = new Upload({ + id: shared.testId('concurrent-failures'), + size, + offset: 0, + }) + + // @ts-expect-error private method + const uploadPartStub = sinon.stub(store, 'uploadPart') + + // Make the second part upload fail + uploadPartStub.onCall(0).resolves('etag1') + uploadPartStub.onCall(1).rejects(new Error('Part 2 upload failed')) + uploadPartStub.onCall(2).resolves('etag3') + + await store.create(upload) + + try { + await store.write(Readable.from(Buffer.alloc(size)), upload.id, upload.offset) + assert.fail('Expected write to fail but it succeeded') + } catch (error) { + assert.equal(uploadPartStub.callCount, 2, '2 parts should have been attempted') + } + }) + shared.shouldHaveStoreMethods() shared.shouldCreateUploads() shared.shouldRemoveUploads() // Termination extension From 0dfa4de05eb29ff7b32e9dc475364160e9c0a4ed Mon Sep 17 00:00:00 2001 From: Murderlon Date: Wed, 12 Mar 2025 14:50:37 +0100 Subject: [PATCH 2/3] fix: reject when needed --- packages/s3-store/src/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/s3-store/src/index.ts b/packages/s3-store/src/index.ts index d261a09d..5009c602 100644 --- a/packages/s3-store/src/index.ts +++ b/packages/s3-store/src/index.ts @@ -443,7 +443,11 @@ export class S3Store extends DataStore { promises.push(Promise.reject(error)) } finally { + // Wait for all promises. We don't want to return + // early have have promises running in the background. await Promise.allSettled(promises) + // But we do want to reject the promise if any of the promises reject. + await Promise.all(promises) } return bytesUploaded From 7b3f8ded75e02001d4cddb6b4523a1a92693dd04 Mon Sep 17 00:00:00 2001 From: Murderlon Date: Wed, 12 Mar 2025 14:55:17 +0100 Subject: [PATCH 3/3] Add changeset --- .changeset/twenty-crews-punch.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/twenty-crews-punch.md diff --git a/.changeset/twenty-crews-punch.md b/.changeset/twenty-crews-punch.md new file mode 100644 index 00000000..d16faec3 --- /dev/null +++ b/.changeset/twenty-crews-punch.md @@ -0,0 +1,5 @@ +--- +"@tus/s3-store": patch +--- + +Fix unhandled promise rejection when uploading a part fails, in which case we returned too early, leaving other parts running in the background.