Skip to content

Commit b610929

Browse files
committed
fix: wait for in progress multi part uploads to cancel for pause and cancel
1 parent 956861b commit b610929

File tree

2 files changed

+134
-75
lines changed

2 files changed

+134
-75
lines changed

packages/storage/amplify_storage_s3/example/integration_test/upload_file_test.dart

Lines changed: 100 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -278,58 +278,110 @@ void main() {
278278
});
279279

280280
group('pause, resume, cancel', () {
281-
const size = 1024 * 1024 * 6;
282-
const chars = 'qwertyuiopasdfghjklzxcvbnm';
283-
final content = List.generate(size, (i) => chars[i % 25]).join();
284-
testWidgets('can pause', (_) async {
285-
final fileId = uuid();
286-
final path = 'public/upload-file-pause-$fileId';
287-
final filePath = await createFile(path: fileId, content: content);
288-
addTearDownPath(StoragePath.fromString(path));
289-
final operation = Amplify.Storage.uploadFile(
290-
localFile: AWSFile.fromPath(filePath),
291-
path: StoragePath.fromString(path),
292-
);
293-
await operation.pause();
294-
unawaited(
295-
operation.result.then(
296-
(value) => fail('should not complete after pause'),
297-
),
281+
/// file sizes in mb
282+
const fileSizes = [
283+
1, // non multi part upload
284+
6, // small multi part upload (2 parts)
285+
50, // large multi part upload (10 parts)
286+
];
287+
for (final fileSize in fileSizes) {
288+
final size = 1024 * 1024 * fileSize;
289+
const chars = 'qwertyuiopasdfghjklzxcvbnm';
290+
final content = List.generate(size, (i) => chars[i % 25]).join();
291+
testWidgets(
292+
'can pause (file size: $fileSize mb)',
293+
(_) async {
294+
final fileId = uuid();
295+
final path = 'public/upload-file-pause-$fileId';
296+
final filePath = await createFile(path: fileId, content: content);
297+
StorageTransferState? state;
298+
addTearDownPath(StoragePath.fromString(path));
299+
final operation = Amplify.Storage.uploadFile(
300+
localFile: AWSFile.fromPath(filePath),
301+
path: StoragePath.fromString(path),
302+
onProgress: (progress) {
303+
state = progress.state;
304+
},
305+
);
306+
await operation.pause();
307+
// pause is only supported for multi part uploads (over 5 mb)
308+
// calling .pause() should not throw, but the operation will not
309+
// actually pause.
310+
if (fileSize > 5) {
311+
unawaited(
312+
operation.result.then(
313+
(value) => fail('should not complete after pause'),
314+
),
315+
);
316+
await Future<void>.delayed(const Duration(seconds: 15));
317+
expect(state, StorageTransferState.paused);
318+
await expectLater(
319+
() => Amplify.Storage.downloadData(
320+
path: StoragePath.fromString(path),
321+
).result,
322+
throwsA(isA<StorageNotFoundException>()),
323+
);
324+
}
325+
},
298326
);
299-
await Future<void>.delayed(const Duration(seconds: 15));
300-
});
301327

302-
testWidgets('can resume', (_) async {
303-
final fileId = uuid();
304-
final path = 'public/upload-file-resume-$fileId';
305-
final filePath = await createFile(path: fileId, content: content);
306-
addTearDownPath(StoragePath.fromString(path));
307-
final operation = Amplify.Storage.uploadFile(
308-
localFile: AWSFile.fromPath(filePath),
309-
path: StoragePath.fromString(path),
328+
testWidgets(
329+
'can resume (file size: $fileSize mb)',
330+
(_) async {
331+
final fileId = uuid();
332+
final path = 'public/upload-file-resume-$fileId';
333+
final filePath = await createFile(path: fileId, content: content);
334+
final state = StreamController<StorageTransferState>();
335+
addTearDownPath(StoragePath.fromString(path));
336+
final operation = Amplify.Storage.uploadFile(
337+
localFile: AWSFile.fromPath(filePath),
338+
path: StoragePath.fromString(path),
339+
onProgress: (progress) {
340+
state.sink.add(progress.state);
341+
},
342+
);
343+
await operation.pause();
344+
await operation.resume();
345+
final nextProgressState = await state.stream.first;
346+
expect(nextProgressState, StorageTransferState.inProgress);
347+
final result = await operation.result;
348+
expect(result.uploadedItem.path, path);
349+
final downloadResult = await Amplify.Storage.downloadData(
350+
path: StoragePath.fromString(path),
351+
).result;
352+
expect(downloadResult.bytes, content.codeUnits);
353+
await state.close();
354+
},
310355
);
311-
await operation.pause();
312-
await operation.resume();
313-
final result = await operation.result;
314-
expect(result.uploadedItem.path, path);
315-
});
316356

317-
testWidgets('can cancel', (_) async {
318-
final fileId = uuid();
319-
final path = 'public/upload-file-cancel-$fileId';
320-
final filePath = await createFile(path: fileId, content: content);
321-
addTearDownPath(StoragePath.fromString(path));
322-
final operation = Amplify.Storage.uploadFile(
323-
localFile: AWSFile.fromPath(filePath),
324-
path: StoragePath.fromString(path),
325-
);
326-
final expectException = expectLater(
327-
() => operation.result,
328-
throwsA(isA<StorageOperationCanceledException>()),
329-
);
330-
await operation.cancel();
331-
await expectException;
332-
});
357+
testWidgets('can cancel (file size: $fileSize mb)', (_) async {
358+
final fileId = uuid();
359+
final path = 'public/upload-file-cancel-$fileId';
360+
final filePath = await createFile(path: fileId, content: content);
361+
StorageTransferState? state;
362+
addTearDownPath(StoragePath.fromString(path));
363+
final operation = Amplify.Storage.uploadFile(
364+
localFile: AWSFile.fromPath(filePath),
365+
path: StoragePath.fromString(path),
366+
onProgress: (progress) {
367+
state = progress.state;
368+
},
369+
);
370+
final expectException = expectLater(
371+
() => operation.result,
372+
throwsA(isA<StorageOperationCanceledException>()),
373+
);
374+
await operation.cancel();
375+
expect(state, StorageTransferState.canceled);
376+
await expectException;
377+
await expectLater(
378+
() => Amplify.Storage.downloadData(
379+
path: StoragePath.fromString(path),
380+
).result,
381+
throwsA(isA<StorageNotFoundException>()),
382+
);
383+
});
384+
}
333385
});
334386
});
335387

packages/storage/amplify_storage_s3_dart/lib/src/storage_s3_service/service/task/s3_upload_task.dart

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,11 @@ class S3UploadTask {
172172
int _currentSubTaskId = 0;
173173
final Completer<void> _determineUploadModeCompleter = Completer();
174174
Completer<void>? _uploadPartBatchingCompleter;
175-
Completer<void>? _abortMultipartUploadCompleter;
176175

177176
FutureOr<void> get _uploadModeDetermined =>
178177
_determineUploadModeCompleter.future;
179178
FutureOr<void> get _uploadPartBatchingCompleted =>
180179
_uploadPartBatchingCompleter?.future;
181-
FutureOr<void> get _abortMultipartUploadCompleted =>
182-
_abortMultipartUploadCompleter?.future;
183180

184181
int get _numOfOngoingSubtasks => _ongoingSubtasks.length;
185182
int get _numOfCompletedSubtasks => _completedSubtasks.length;
@@ -263,6 +260,7 @@ class S3UploadTask {
263260
if (!_isMultipartUpload || _state != StorageTransferState.inProgress) {
264261
return;
265262
}
263+
_state = StorageTransferState.paused;
266264

267265
await _uploadPartBatchingCompleted;
268266

@@ -279,6 +277,7 @@ class S3UploadTask {
279277
if (!_isMultipartUpload || _state != StorageTransferState.paused) {
280278
return;
281279
}
280+
_state = StorageTransferState.inProgress;
282281
await _uploadPartBatchingCompleted;
283282

284283
_subtasksStreamSubscription.resume();
@@ -300,6 +299,7 @@ class S3UploadTask {
300299
_state == StorageTransferState.failure) {
301300
return;
302301
}
302+
_state = StorageTransferState.canceled;
303303

304304
if (_isMultipartUpload) {
305305
await _subtasksStreamSubscription.cancel();
@@ -355,10 +355,17 @@ class S3UploadTask {
355355

356356
_state = StorageTransferState.success;
357357
} on CancellationException {
358-
_logger.debug('PutObject HTTP operation has been canceled.');
359-
_state = StorageTransferState.canceled;
360-
_uploadCompleter
361-
.completeError(s3_exception.s3ControllableOperationCanceledException);
358+
// CancellationException is expected when the operation is paused. The
359+
// exception should be swallowed in this case.
360+
if (_state == StorageTransferState.paused) {
361+
_logger.debug(
362+
'PutObject HTTP operation has been paused.',
363+
);
364+
return;
365+
}
366+
_uploadCompleter.completeError(
367+
s3_exception.s3ControllableOperationCanceledException,
368+
);
362369
} on smithy.UnknownSmithyHttpException catch (error, stackTrace) {
363370
_completeUploadWithError(
364371
error.toStorageException(),
@@ -376,6 +383,7 @@ class S3UploadTask {
376383
Future<void> _startMultipartUpload(
377384
AWSFile localFile,
378385
) async {
386+
_state = StorageTransferState.inProgress;
379387
// 1. check if can initiate multipart upload with the given file size
380388
// and create a multipart upload and set its id to _multipartUploadId
381389
try {
@@ -396,29 +404,25 @@ class S3UploadTask {
396404
_subtasksStreamController = StreamController(
397405
onListen: () {
398406
// 3. start the multipart uploading
399-
_state = StorageTransferState.inProgress;
400407
unawaited(_startNextUploadPartsBatch());
401408
_emitTransferProgress();
402409
_determineUploadModeCompleter.complete();
403410
},
404411
onPause: () async {
405-
_state = StorageTransferState.paused;
406-
_cancelOngoingUploadPartOperations(cancelingOnPause: true);
412+
await _cancelOngoingUploadPartOperations(cancelingOnPause: true);
407413
_emitTransferProgress();
408414
},
409415
onResume: () async {
410416
unawaited(_startNextUploadPartsBatch(resumingFromPause: true));
411-
_state = StorageTransferState.inProgress;
412417
_emitTransferProgress();
413418
},
414419
onCancel: () async {
415420
// _streamController.close triggers this callback but we don't
416421
// need to emit canceled state as the upload has completed
417-
if (_state == StorageTransferState.canceled ||
418-
_numOfCompletedSubtasks == _expectedNumOfSubtasks) {
422+
if (_numOfCompletedSubtasks == _expectedNumOfSubtasks) {
419423
return;
420424
}
421-
_cancelOngoingUploadPartOperations();
425+
await _cancelOngoingUploadPartOperations();
422426
await _terminateMultipartUploadOnError(
423427
s3_exception.s3ControllableOperationCanceledException,
424428
isCancel: true,
@@ -690,10 +694,17 @@ class S3UploadTask {
690694
try {
691695
final completedSubtask = await uploadPartRequest;
692696
_subtasksStreamController.add(completedSubtask);
693-
} on CancellationException {
694-
_logger
695-
.debug('Part $partNumber upload HTTP operation has been canceled.');
696697
} on Exception catch (error) {
698+
// Each part upload is canceled during pause/cancel, which results in an
699+
// expected Exception. _terminateMultipartUploadOnError does not need to be
700+
// invoked since it is already invoked when cancel() is invoked.
701+
if (_state == StorageTransferState.canceled ||
702+
_state == StorageTransferState.paused) {
703+
_logger.debug(
704+
'Part $partNumber upload HTTP operation has been ${_state.name}.',
705+
);
706+
return;
707+
}
697708
// May include:
698709
// - exceptions created from smithy.UnknownSmithyHttpException
699710
// - NetworkException
@@ -702,15 +713,17 @@ class S3UploadTask {
702713
}
703714
}
704715

705-
void _cancelOngoingUploadPartOperations({
716+
Future<void> _cancelOngoingUploadPartOperations({
706717
bool cancelingOnPause = false,
707-
}) {
718+
}) async {
719+
final cancelFutures = <Future<void>>[];
708720
for (final operation in _ongoingUploadPartHttpOperations.values) {
709-
operation.smithyOperation.cancel();
721+
cancelFutures.add(operation.smithyOperation.cancel());
710722
if (!cancelingOnPause) {
711723
_ongoingSubtasks.remove(operation.partNumber);
712724
}
713725
}
726+
await Future.wait(cancelFutures);
714727
}
715728

716729
Future<void> _terminateMultipartUploadOnError(
@@ -719,14 +732,10 @@ class S3UploadTask {
719732
}) async {
720733
// in parallel part upload failures will all invoke this function
721734
// use this to avoid invoking AbortMultipartUploadRequest multiple times
722-
await _abortMultipartUploadCompleted;
723-
if (_state == StorageTransferState.canceled ||
724-
_state == StorageTransferState.failure) {
735+
if (_state == StorageTransferState.failure) {
725736
return;
726737
}
727738

728-
_abortMultipartUploadCompleter = Completer();
729-
730739
final request = s3.AbortMultipartUploadRequest.build((builder) {
731740
builder
732741
..bucket = _bucket
@@ -747,8 +756,6 @@ class S3UploadTask {
747756
),
748757
);
749758
}
750-
751-
_abortMultipartUploadCompleter?.complete();
752759
}
753760

754761
void _completeUploadWithError(

0 commit comments

Comments
 (0)