diff --git a/AWSS3/AWSS3TransferUtility+HeaderHelper.m b/AWSS3/AWSS3TransferUtility+HeaderHelper.m index 36c0b94b444..d26ff04433a 100644 --- a/AWSS3/AWSS3TransferUtility+HeaderHelper.m +++ b/AWSS3/AWSS3TransferUtility+HeaderHelper.m @@ -52,7 +52,7 @@ - (void)assignRequestParameters:(AWSS3GetPreSignedURLRequest *)getPreSignedURLRe @implementation AWSS3TransferUtility (HeaderHelper) --(void) filterAndAssignHeaders:(NSDictionary *) requestHeaders +- (void)filterAndAssignHeaders:(NSDictionary *) requestHeaders getPresignedURLRequest:(AWSS3GetPreSignedURLRequest *) getPresignedURLRequest URLRequest: (NSMutableURLRequest *) URLRequest { diff --git a/AWSS3/AWSS3TransferUtility.h b/AWSS3/AWSS3TransferUtility.h index 177c6578353..7e8036bf498 100644 --- a/AWSS3/AWSS3TransferUtility.h +++ b/AWSS3/AWSS3TransferUtility.h @@ -620,6 +620,30 @@ handleEventsForBackgroundURLSession:(NSString *)identifier expression:(nullable AWSS3TransferUtilityDownloadExpression *)expression completionHandler:(nullable AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler; +///** +// Suspends multipart upload +// @param multipartUploadTask task +// */ +//- (nullable NSError *)suspendMultipartUpload:(AWSS3TransferUtilityMultiPartUploadTask *)multipartUploadTask; + +/** + Suspends all multipart uploads. + @param completionHandler completion handler + */ +- (void)suspendAllMultipartUploadsWithCompletionHandler:(nullable AWSS3TransferUtilityMultiPartUploadSuspendBlock)completionHandler; + +///** +// Resumes a multipart upload. +// @param multiPartUploadTask The task to resume +// */ +//- (nullable NSError *)resumeMultipartUpload:(nonnull AWSS3TransferUtilityMultiPartUploadTask *)multiPartUploadTask; + +/** + Resumes all multipart uploads. + @param completionHandler completion handler + */ +- (void)resumeAllMultipartUploadsWithCompletionHandler:(nullable AWSS3TransferUtilityMultiPartUploadResumeBlock)completionHandler; + /** Assigns progress feedback and completion handler blocks. This method should be called when the app was suspended while the transfer is still happening. diff --git a/AWSS3/AWSS3TransferUtility.m b/AWSS3/AWSS3TransferUtility.m index 812348597b2..5c69bfd4604 100644 --- a/AWSS3/AWSS3TransferUtility.m +++ b/AWSS3/AWSS3TransferUtility.m @@ -68,13 +68,11 @@ - (AWSTask *) validateParameters: (NSString * )bucket key:(NSString *)key accele @end @interface AWSS3TransferUtility (HeaderHelper) --(void) filterAndAssignHeaders:(NSDictionary *) requestHeaders +- (void)filterAndAssignHeaders:(NSDictionary *) requestHeaders getPresignedURLRequest:(AWSS3GetPreSignedURLRequest *) getPresignedURLRequest URLRequest: (NSMutableURLRequest *) URLRequest; @end - - #pragma mark - AWSS3TransferUtility @implementation AWSS3TransferUtility @@ -292,14 +290,14 @@ - (instancetype)initWithConfiguration:(AWSServiceConfiguration *)serviceConfigur delegateQueue:nil]; //If not able to create the session, call completion handler with error and return nil. - if (!_session ) { + if (!_session) { NSString* message = [NSString stringWithFormat:@"Failed to create a NSURLSession for [%@]", _sessionIdentifier]; NSDictionary *userInfo = [NSDictionary dictionaryWithObject:message forKey:@"Message"]; NSError *error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain code:AWSS3TransferUtilityErrorClientError userInfo:userInfo]; - if (completionHandler ) { + if (completionHandler) { completionHandler(error); } return nil; @@ -309,7 +307,7 @@ - (instancetype)initWithConfiguration:(AWSServiceConfiguration *)serviceConfigur _configuration = [serviceConfiguration copy]; [_configuration addUserAgentProductToken:AWSS3TransferUtilityUserAgent]; - if (transferUtilityConfiguration ) { + if (transferUtilityConfiguration) { _transferUtilityConfiguration = [transferUtilityConfiguration copy]; } else { @@ -321,8 +319,11 @@ - (instancetype)initWithConfiguration:(AWSServiceConfiguration *)serviceConfigur //Setup internal Data Structures - _taskDictionary = [AWSSynchronizedMutableDictionary new]; - _completedTaskDictionary = [AWSSynchronizedMutableDictionary new]; + AWSSynchronizedMutableDictionary *taskDictionary = [AWSSynchronizedMutableDictionary new]; + AWSSynchronizedMutableDictionary *completedTaskDictionary = taskDictionary.syncedDictionary; + + _taskDictionary = taskDictionary; + _completedTaskDictionary = completedTaskDictionary; //Instantiate the Database Helper self.databaseQueue = [AWSS3TransferUtilityDatabaseHelper createDatabase:self.cacheDirectoryPath]; @@ -362,7 +363,7 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary NSMutableArray *tasks = [AWSS3TransferUtilityDatabaseHelper getTransferTaskDataFromDB:_sessionIdentifier databaseQueue:_databaseQueue]; //Iterate through the tasks and populate transferRequests and Multipart dictionary. - for( NSMutableDictionary *task in tasks ) { + for( NSMutableDictionary *task in tasks) { NSString *transferType = [task objectForKey:@"transfer_type"]; int sessionTaskID = [[task objectForKey:@"session_task_id"] intValue]; @@ -370,9 +371,9 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary AWSS3TransferUtilityUploadTask *transferUtilityUploadTask = [self hydrateUploadTask:task sessionIdentifier:self.sessionIdentifier databaseQueue:self.databaseQueue]; //If task is completed, no more processing is required. - if (transferUtilityUploadTask.status == AWSS3TransferUtilityTransferStatusCompleted ) { + if (transferUtilityUploadTask.status == AWSS3TransferUtilityTransferStatusCompleted) { [self.completedTaskDictionary setObject:transferUtilityUploadTask forKey:transferUtilityUploadTask.transferID]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID databaseQueue:self.databaseQueue]; continue; } //Lodge in temporary Dictionary @@ -383,9 +384,9 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary AWSS3TransferUtilityDownloadTask *transferUtilityDownloadTask = [self hydrateDownloadTask:task sessionIdentifier:self.sessionIdentifier databaseQueue:self.databaseQueue]; //If task is completed, no more processing is required. - if (transferUtilityDownloadTask.status == AWSS3TransferUtilityTransferStatusCompleted ) { + if (transferUtilityDownloadTask.status == AWSS3TransferUtilityTransferStatusCompleted) { [self.completedTaskDictionary setObject:transferUtilityDownloadTask forKey:transferUtilityDownloadTask.transferID]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityDownloadTask.transferID databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityDownloadTask.transferID databaseQueue:self.databaseQueue]; continue; } //Lodge in temporary Dictionary for linking @@ -401,7 +402,7 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary transferUtilityMultiPartUploadTask.status == AWSS3TransferUtilityTransferStatusCancelled || transferUtilityMultiPartUploadTask.status == AWSS3TransferUtilityTransferStatusError) { [self.completedTaskDictionary setObject:transferUtilityMultiPartUploadTask forKey:transferUtilityMultiPartUploadTask.transferID]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityMultiPartUploadTask.transferID databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityMultiPartUploadTask.transferID databaseQueue:self.databaseQueue]; continue; } @@ -415,14 +416,14 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary //Get the Master MultiPart record from the Dictionary. AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID]; - if ( !multiPartUploadTask ) { + if ( !multiPartUploadTask) { //Couldn't find the multipart upload master record. Must be an orphan part record. Clean up the DB and continue. - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:subTask.transferID databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:subTask.transferID databaseQueue:self.databaseQueue]; continue; } //Check if the subTask is is already completed. If it is, add it to the completed parts list, update the progress object and go to the next iteration of the loop - if (subTask.status== AWSS3TransferUtilityTransferStatusCompleted ) { - [multiPartUploadTask.completedPartsSet addObject:subTask]; + if (subTask.status == AWSS3TransferUtilityTransferStatusCompleted) { + [multiPartUploadTask addUploadSubTask:subTask]; continue; } @@ -432,23 +433,22 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary } } -- (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary - tempTransferDictionary: (NSMutableDictionary *) tempTransferDictionary - completionHandler: (void (^)(NSError *_Nullable error)) completionHandler{ +- (void)linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary + tempTransferDictionary:(NSMutableDictionary *)tempTransferDictionary + completionHandler:(void (^)(NSError *_Nullable error)) completionHandler { //Get tasks from the NSURLSession and reattach to them. //getTasksWithCompletionHandler is an ansynchronous task, so the thread that is calling this method will not be blocked. [self.session getTasksWithCompletionHandler:^(NSArray *dataTasks, NSArray *uploadTasks, NSArray *downloadTasks) { - + //Loop through all the upload Tasks. - for( NSURLSessionUploadTask *task in uploadTasks ) { + for (NSURLSessionUploadTask *task in uploadTasks) { AWSDDLogDebug(@"Iterating through task Identifier [%lu]", (unsigned long)task.taskIdentifier); NSError *taskError = [task error]; - + //Get the Task id obj = [tempTransferDictionary objectForKey:@(task.taskIdentifier)]; - - if ([obj isKindOfClass:[AWSS3TransferUtilityUploadTask class]]) - { + + if ([obj isKindOfClass:[AWSS3TransferUtilityUploadTask class]]) { //Found a upload task. AWSS3TransferUtilityUploadTask *uploadTask = obj; uploadTask.sessionTask = task; @@ -468,11 +468,11 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster filePath:uploadTask.file]; continue; } - + //Check if it is InProgress if (uploadTask.status == AWSS3TransferUtilityTransferStatusInProgress) { //Check if the the underlying NSURLSession task is completed. If so, delete the record from the DB, clean up any temp files and call the completion handler. - if ([task state] == NSURLSessionTaskStateCompleted) { + if (task.state == NSURLSessionTaskStateCompleted) { //Set progress to 100% uploadTask.progress.completedUnitCount = uploadTask.progress.totalUnitCount; uploadTask.status = AWSS3TransferUtilityTransferStatusCompleted; @@ -483,7 +483,7 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster continue; } //If it is in any other status than running, then we need to recover by retrying. - if ([task state] != NSURLSessionTaskStateRunning) { + if (task.state != NSURLSessionTaskStateRunning) { //We think the task in IN_PROGRESS. The underlying task is not running. //Recover the situation by retrying. [self retryUpload:uploadTask]; @@ -494,7 +494,7 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster } //Loop through all the Download tasks - for( NSURLSessionDownloadTask *task in downloadTasks ) { + for( NSURLSessionDownloadTask *task in downloadTasks) { id obj = [tempTransferDictionary objectForKey:@(task.taskIdentifier)]; NSError *taskError = [task error]; @@ -521,7 +521,7 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster //Check if this is in progress if (downloadTask.status == AWSS3TransferUtilityTransferStatusInProgress) { - if ([task state] == NSURLSessionTaskStateCompleted) { + if (task.state == NSURLSessionTaskStateCompleted) { //Set progress to 100% downloadTask.progress.completedUnitCount = downloadTask.progress.totalUnitCount; downloadTask.status = AWSS3TransferUtilityTransferStatusCompleted; @@ -529,7 +529,7 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster continue; } //Check if the underlying task's status is not in Progress. - else if ([task state] != NSURLSessionTaskStateRunning) { + else if (task.state != NSURLSessionTaskStateRunning) { //We think the task in Progress. The underlying task is not in progress. //Recover the situation by retrying [self retryDownload:downloadTask]; @@ -566,13 +566,24 @@ - (void) markTransferAsCompleted:(AWSS3TransferUtilityTask *) transferUtilityTas AWSDDLogError(@"Task [%lu] has an error status [%@]. Marking transfer status as Error", (unsigned long)transferUtilityTask.taskIdentifier, taskError); transferUtilityTask.error = taskError; } - [self.completedTaskDictionary setObject:transferUtilityTask forKey:transferUtilityTask.transferID]; - [self.taskDictionary removeObjectForKey:@(transferUtilityTask.taskIdentifier)]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityTask.transferID databaseQueue:self->_databaseQueue]; + + [self completeTransferUtilityTask:transferUtilityTask]; + + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityTask.transferID databaseQueue:self.databaseQueue]; } -- (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary - tempTransferDictionary: (NSMutableDictionary *) tempTransferDictionary { +- (void)completeTransferUtilityTask:(AWSS3TransferUtilityTask *)transferUtilityTask { + [AWSSynchronizedMutableDictionary mutateSyncedDictionaries:@[self.taskDictionary, self.completedTaskDictionary] withBlock:^(NSUUID * instanceKey, NSMutableDictionary *dictionary) { + if ([instanceKey isEqual:self.completedTaskDictionary.instanceKey]) { + [dictionary setObject:transferUtilityTask forKey:transferUtilityTask.transferID]; + } else if ([instanceKey isEqual:self.taskDictionary.instanceKey]) { + [dictionary removeObjectForKey:@(transferUtilityTask.taskIdentifier)]; + } + }]; +} + +- (void)handleUnlinkedTransfers:(NSMutableDictionary *)tempMultiPartMasterTaskDictionary + tempTransferDictionary:(NSMutableDictionary *)tempTransferDictionary { //At this point, we have finished iterating through the tasks present in the NSURLSession and removed all the matching ones from the transferRequests dictionary. //If there are any left in the transferRequests list, it means that we think they are running, but NSURLSession doesn't know about them. for (id taskIdentifier in [tempTransferDictionary allKeys]) { @@ -582,49 +593,46 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask { AWSS3TransferUtilityUploadTask *transferUtilityUploadTask = obj; - if (transferUtilityUploadTask.status == AWSS3TransferUtilityTransferStatusCompleted ) { + if (transferUtilityUploadTask.status == AWSS3TransferUtilityTransferStatusCompleted) { [self.completedTaskDictionary setObject:transferUtilityUploadTask forKey:transferUtilityUploadTask.transferID]; - + //Delete the transfer record from the DB - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self->_databaseQueue ]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self.databaseQueue ]; AWSDDLogDebug(@"Deleted transfer request from the DB"); } //Check if the transfer is in a paused state and the input file for the transfer exists. - else if ( [[NSFileManager defaultManager] fileExistsAtPath:transferUtilityUploadTask.file] && + else if ([[NSFileManager defaultManager] fileExistsAtPath:transferUtilityUploadTask.file] && transferUtilityUploadTask.status == AWSS3TransferUtilityTransferStatusPaused) { //If the transfer was paused and the local file is still present, create another NSURLSession task and leave it in a paused state - [ self createUploadTask:transferUtilityUploadTask startTransfer:NO]; - } - else { + [self createUploadTask:transferUtilityUploadTask startTransfer:NO]; + } else { //Transfer is in progress according to us, but not present in the NSURLSession. It may have been sucessfully completed. Do not retry. //Mark the status as unknown. The app developer should check to see if the S3 file was uploaded in the app logic and reinitate the transfer if required. transferUtilityUploadTask.status = AWSS3TransferUtilityTransferStatusUnknown; [self.completedTaskDictionary setObject:transferUtilityUploadTask forKey:transferUtilityUploadTask.transferID]; //Delete the transfer record from the DB - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self->_databaseQueue ]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self.databaseQueue ]; AWSDDLogDebug(@"Deleted transfer request from the DB"); } } - else if([obj isKindOfClass:[AWSS3TransferUtilityUploadSubTask class]]) - { + else if([obj isKindOfClass:[AWSS3TransferUtilityUploadSubTask class]]) { AWSS3TransferUtilityUploadSubTask *subTask = obj; AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID]; [self retryUploadSubTask: multiPartUploadTask subTask:subTask startTransfer:NO]; subTask.status = AWSS3TransferUtilityTransferStatusWaiting; - [multiPartUploadTask.waitingPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)]; + [multiPartUploadTask addUploadSubTask:subTask]; } else if ([obj isKindOfClass:[AWSS3TransferUtilityDownloadTask class]]) { - AWSS3TransferUtilityDownloadTask *downloadTask = obj; - - if (downloadTask.status == AWSS3TransferUtilityTransferStatusCompleted ) { + + if (downloadTask.status == AWSS3TransferUtilityTransferStatusCompleted) { [self.completedTaskDictionary setObject:downloadTask forKey:downloadTask.transferID]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self->_databaseQueue ]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self.databaseQueue]; AWSDDLogDebug(@"Deleted transfer request from DB"); } else if (downloadTask.status == AWSS3TransferUtilityTransferStatusPaused) { //If the transfer was paused, create another NSURLSession task and leave it in an paused state - [ self createDownloadTask:downloadTask startTransfer:NO]; + [self createDownloadTask:downloadTask startTransfer:NO]; } else { //Transfer is in progress according to us, but not present in the NSURLSession. It may have been sucessfully completed. Do not retry. @@ -632,7 +640,7 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask downloadTask.status = AWSS3TransferUtilityTransferStatusUnknown; [self.completedTaskDictionary setObject:downloadTask forKey:downloadTask.transferID]; - [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self->_databaseQueue ]; + [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID taskIdentifier:[taskIdentifier integerValue] databaseQueue:self.databaseQueue]; AWSDDLogDebug(@"Deleted transfer request from DB"); } } @@ -652,48 +660,20 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask if (multiPartUploadTask.status == AWSS3TransferUtilityTransferStatusPaused) { continue; } - - long numberOfPartsInProgress = 0; - while (numberOfPartsInProgress < [self.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue]) { - if ([multiPartUploadTask.waitingPartsDictionary count] > 0) { - //Get a part from the waitingList - AWSS3TransferUtilityUploadSubTask *nextSubTask = [[multiPartUploadTask.waitingPartsDictionary allValues] objectAtIndex:0]; - - //Add to inProgress list - [multiPartUploadTask.inProgressPartsDictionary setObject:nextSubTask forKey:@(nextSubTask.taskIdentifier)]; - - //Remove it from the waitingList - [multiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)]; - AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), multiPartUploadTask.uploadID); - [nextSubTask.sessionTask resume]; - - numberOfPartsInProgress++; - continue; - } - break; - } - // move suspended tasks from in progress to waiting to allow multipart upload process to run properly - NSMutableArray *inProgressAndSuspendedTasks = @[].mutableCopy; - - for (AWSS3TransferUtilityUploadSubTask *aSubTask in multiPartUploadTask.inProgressPartsDictionary.allValues) { - if (aSubTask.sessionTask.state == NSURLSessionTaskStateSuspended) { - AWSDDLogDebug(@"Subtask for multipart upload is suspended: %ld", aSubTask.taskIdentifier); - [inProgressAndSuspendedTasks addObject:aSubTask]; - } + while (multiPartUploadTask.isUnderConcurrencyLimit && multiPartUploadTask.hasWaitingTasks) { + //Get a part from the waitingList + AWSS3TransferUtilityUploadSubTask *nextSubTask = [multiPartUploadTask.waitingTasks objectAtIndex:0]; + [multiPartUploadTask moveWaitingTaskToInProgress:nextSubTask startTransfer:YES]; } - for (AWSS3TransferUtilityUploadSubTask *aSubTask in inProgressAndSuspendedTasks) { - [multiPartUploadTask.inProgressPartsDictionary removeObjectForKey:@(aSubTask.taskIdentifier)]; - [multiPartUploadTask.waitingPartsDictionary setObject:aSubTask forKey:@(aSubTask.taskIdentifier)]; - } + [multiPartUploadTask moveInProgressAndSuspendedTasks]; } } --(AWSS3TransferUtilityUploadTask *) hydrateUploadTask: (NSMutableDictionary *) task - sessionIdentifier: (NSString *) sessionIdentifier - databaseQueue: (AWSFMDatabaseQueue *) databaseQueue -{ +- (AWSS3TransferUtilityUploadTask *)hydrateUploadTask:(NSMutableDictionary *)task + sessionIdentifier:(NSString *)sessionIdentifier + databaseQueue:(AWSFMDatabaseQueue *)databaseQueue { AWSS3TransferUtilityUploadTask *transferUtilityUploadTask = [AWSS3TransferUtilityUploadTask new]; transferUtilityUploadTask.nsURLSessionID = sessionIdentifier; transferUtilityUploadTask.databaseQueue = databaseQueue; @@ -713,11 +693,9 @@ -(AWSS3TransferUtilityUploadTask *) hydrateUploadTask: (NSMutableDictionary *) t return transferUtilityUploadTask; } - -- (AWSS3TransferUtilityDownloadTask *) hydrateDownloadTask: (NSMutableDictionary *) task - sessionIdentifier: (NSString *) sessionIdentifier - databaseQueue: (AWSFMDatabaseQueue *) databaseQueue -{ +- (AWSS3TransferUtilityDownloadTask *)hydrateDownloadTask:(NSMutableDictionary *) task + sessionIdentifier:(NSString *) sessionIdentifier + databaseQueue:(AWSFMDatabaseQueue *) databaseQueue { AWSS3TransferUtilityDownloadTask *transferUtilityDownloadTask = [AWSS3TransferUtilityDownloadTask new]; transferUtilityDownloadTask.nsURLSessionID = sessionIdentifier; transferUtilityDownloadTask.databaseQueue = databaseQueue; @@ -737,11 +715,11 @@ - (AWSS3TransferUtilityDownloadTask *) hydrateDownloadTask: (NSMutableDictionary } --( AWSS3TransferUtilityMultiPartUploadTask *) hydrateMultiPartUploadTask: (NSMutableDictionary *) task - sessionIdentifier: (NSString *) sessionIdentifier - databaseQueue: (AWSFMDatabaseQueue *) databaseQueue -{ +- (AWSS3TransferUtilityMultiPartUploadTask *)hydrateMultiPartUploadTask: (NSMutableDictionary *) task + sessionIdentifier:(NSString *) sessionIdentifier + databaseQueue: (AWSFMDatabaseQueue *) databaseQueue { AWSS3TransferUtilityMultiPartUploadTask *transferUtilityMultiPartUploadTask = [AWSS3TransferUtilityMultiPartUploadTask new]; + [transferUtilityMultiPartUploadTask integrateWithTransferUtility:self]; transferUtilityMultiPartUploadTask.nsURLSessionID = sessionIdentifier; transferUtilityMultiPartUploadTask.databaseQueue = databaseQueue; transferUtilityMultiPartUploadTask.transferType = [task objectForKey:@"transfer_type"]; @@ -900,7 +878,7 @@ - (AWSS3TransferUtilityUploadSubTask * ) hydrateMultiPartUploadSubTask:(NSMutabl transferUtilityUploadTask.status = AWSS3TransferUtilityTransferStatusInProgress; //Add to Database - [AWSS3TransferUtilityDatabaseHelper insertUploadTransferRequestInDB:transferUtilityUploadTask databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper insertUploadTransferRequestInDB:transferUtilityUploadTask databaseQueue:self.databaseQueue]; return [self createUploadTask:transferUtilityUploadTask]; } @@ -949,7 +927,7 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *) return [[self.preSignedURLBuilder getPreSignedURL:getPreSignedURLRequest] continueWithBlock:^id(AWSTask *task) { NSURL *presignedURL = task.result; NSError *error = task.error; - if ( error ) { + if (error) { AWSDDLogError(@"Error: %@", error); return [AWSTask taskWithError:error]; } @@ -974,7 +952,7 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *) } transferUtilityUploadTask.sessionTask = uploadTask; - if ( startTransfer) { + if (startTransfer) { transferUtilityUploadTask.status = AWSS3TransferUtilityTransferStatusInProgress; } else { @@ -993,7 +971,7 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *) eTag:@"" status:transferUtilityUploadTask.status retry_count:transferUtilityUploadTask.retryCount - databaseQueue:self->_databaseQueue]; + databaseQueue:self.databaseQueue]; if (startTransfer) { [uploadTask resume]; } @@ -1005,7 +983,7 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *) - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTask { //Remove from taskDictionary - [self.taskDictionary removeObjectForKey:@(transferUtilityUploadTask.taskIdentifier)]; + [self unregisterTaskIdentifier:transferUtilityUploadTask.taskIdentifier]; //Remove from Database [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:transferUtilityUploadTask.taskIdentifier databaseQueue:_databaseQueue ]; @@ -1064,6 +1042,7 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa if (!result) { if (completionHandler) { AWSS3TransferUtilityMultiPartUploadTask *uploadTask = [AWSS3TransferUtilityMultiPartUploadTask new]; + [uploadTask integrateWithTransferUtility:self]; uploadTask.bucket = bucket; uploadTask.key = key; completionHandler(uploadTask, error); @@ -1110,36 +1089,36 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa } - (AWSTask *)internalUploadFileUsingMultiPart:(NSURL *)fileURL - bucket:(NSString *)bucket - key:(NSString *)key - contentType:(NSString *)contentType - expression:(AWSS3TransferUtilityMultiPartUploadExpression *)expression - temporaryFileCreated: (BOOL) temporaryFileCreated - completionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock) completionHandler { - + bucket:(NSString *)bucket + key:(NSString *)key + contentType:(NSString *)contentType + expression:(AWSS3TransferUtilityMultiPartUploadExpression *)expression + temporaryFileCreated: (BOOL) temporaryFileCreated + completionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock) completionHandler { //Validate input parameters. - AWSTask *error = [self validateParameters:bucket key:key fileURL:fileURL accelerationModeEnabled:self.transferUtilityConfiguration.isAccelerateModeEnabled]; + AWSTask *error = [self validateParameters:bucket key:key fileURL:fileURL accelerationModeEnabled:self.transferUtilityConfiguration.isAccelerateModeEnabled]; if (error) { if (temporaryFileCreated) { [self removeFile:[fileURL path]]; } return error; } - + //Create Expression if required and set values on the object if (!expression) { expression = [AWSS3TransferUtilityMultiPartUploadExpression new]; } - //Override the content type value set in the expression object with the passed in parameter value. + //Override the content type value set in the expression object with the passed in parameter value. if (contentType) { - [expression setValue:contentType forRequestHeader:@"Content-Type"]; + [expression setValue:contentType forRequestHeader:@"Content-Type"]; } - + expression.completionHandler = completionHandler; - + //Create TransferUtility Multipart Upload Task AWSS3TransferUtilityMultiPartUploadTask *transferUtilityMultiPartUploadTask = [AWSS3TransferUtilityMultiPartUploadTask new]; + [transferUtilityMultiPartUploadTask integrateWithTransferUtility:self]; transferUtilityMultiPartUploadTask.nsURLSessionID = self.sessionIdentifier; transferUtilityMultiPartUploadTask.databaseQueue = self.databaseQueue; transferUtilityMultiPartUploadTask.transferType = @"MULTI_PART_UPLOAD"; @@ -1155,7 +1134,7 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa //Get the size of the file and calculate the number of parts. NSError *nsError = nil; NSDictionary *attributes = [[NSFileManager defaultManager] attributesOfItemAtPath:[fileURL path] - error:&nsError]; + error:&nsError]; if (!attributes) { if (transferUtilityMultiPartUploadTask.temporaryFileCreated) { [self removeFile:transferUtilityMultiPartUploadTask.file]; @@ -1177,7 +1156,7 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa uploadRequest.key = key; [AWSS3CreateMultipartUploadRequest propagateHeaderInformation:uploadRequest requestHeaders:transferUtilityMultiPartUploadTask.expression.requestHeaders]; - + //Initiate the multi part return [[self.s3 createMultipartUpload:uploadRequest] continueWithBlock:^id(AWSTask *task) { //Initiation of multi part failed. @@ -1192,7 +1171,7 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa AWSS3CreateMultipartUploadOutput *output = task.result; //Check if the uploadId is null to safeguard from crash reported in https://github.com/aws/aws-sdk-ios/issues/1060 - if (output.uploadId == (id) [NSNull null] || output.uploadId.length == 0 ) { + if (output.uploadId == (id) [NSNull null] || output.uploadId.length == 0) { AWSDDLogError(@"MultiPartUploadID is null - Failing Transfer"); NSDictionary *userInfo = [NSDictionary dictionaryWithObject:@"MultiPartUploadID is null - Failing Transfer" forKey:@"Message"]; @@ -1201,22 +1180,22 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa code:AWSS3TransferUtilityErrorServerError userInfo:userInfo]; return [AWSTask taskWithError:error]; - } + } transferUtilityMultiPartUploadTask.uploadID = output.uploadId; - + //Save the Multipart Upload in the DB - [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestInDB:transferUtilityMultiPartUploadTask databaseQueue:self->_databaseQueue]; - + [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestInDB:transferUtilityMultiPartUploadTask databaseQueue:self.databaseQueue]; + AWSDDLogInfo(@"Initiated multipart upload on server: %@", output.uploadId); AWSDDLogInfo(@"Concurrency Limit is %@", self.transferUtilityConfiguration.multiPartConcurrencyLimit); //Loop through the file and upload the parts one by one for (int32_t i = 1; i <= partCount ; i++) { NSUInteger dataLength = AWSS3TransferUtilityMultiPartSize; if (i == partCount) { - dataLength = fileSize - ( (i-1) * AWSS3TransferUtilityMultiPartSize); + dataLength = fileSize - ((i-1) * AWSS3TransferUtilityMultiPartSize); } - + AWSS3TransferUtilityUploadSubTask *subTask = [AWSS3TransferUtilityUploadSubTask new]; subTask.transferID = transferUtilityMultiPartUploadTask.transferID; subTask.partNumber = @(i); @@ -1226,29 +1205,18 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa subTask.responseData = @""; subTask.file = @""; subTask.eTag = @""; - + subTask.status = AWSS3TransferUtilityTransferStatusWaiting; + NSError *subTaskCreationError; - + //Move to inProgress or Waiting based on concurrency limit - if (i <= [self.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue]) { - subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:NO internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.inProgressPartsDictionary]; - if(!subTaskCreationError) { - subTask.status = AWSS3TransferUtilityTransferStatusInProgress; - AWSDDLogDebug(@"Added task for part [%@] to inProgress list", subTask.partNumber); - } - } - else { - subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:NO internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.waitingPartsDictionary]; - if (!subTaskCreationError ) { - subTask.status = AWSS3TransferUtilityTransferStatusWaiting; - AWSDDLogDebug(@"Added task for part [%@] to Waiting list", subTask.partNumber); - } - } - + subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:NO]; if (!subTaskCreationError) { + AWSDDLogDebug(@"Added task for part [%@] to waiting list", subTask.partNumber); //Save in Database after the file has been created, so that file can be referenced incase upload is paused and needs to be restarted. - [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB:transferUtilityMultiPartUploadTask subTask:subTask - databaseQueue:self.databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB:transferUtilityMultiPartUploadTask + subTask:subTask + databaseQueue:self.databaseQueue]; } else { //Abort the request, so the server can clean up any partials. [self callAbortMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; @@ -1259,16 +1227,11 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; return [AWSTask taskWithError:subTaskCreationError]; } - } - + //Start the subTasks - for(id taskIdentifier in transferUtilityMultiPartUploadTask.inProgressPartsDictionary) { - AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:taskIdentifier]; - AWSDDLogDebug(@"Starting subTask %@", @(subTask.taskIdentifier)); - [subTask.sessionTask resume]; - } - + [transferUtilityMultiPartUploadTask moveWaitingTasksToInProgress:YES]; + return [AWSTask taskWithResult:transferUtilityMultiPartUploadTask]; }]; return [AWSTask taskWithResult:transferUtilityMultiPartUploadTask]; @@ -1408,19 +1371,14 @@ - (nullable NSURL *)createPartialFile:(NSURL *)fileURL return partialFileURL; } --(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) transferUtilityMultiPartUploadTask - subTask: (AWSS3TransferUtilityUploadSubTask *) subTask -internalDictionaryToAddSubTaskTo: (NSMutableDictionary *) internalDictionaryToAddSubTaskTo - -{ - return [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:YES internalDictionaryToAddSubTaskTo:internalDictionaryToAddSubTaskTo]; +- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask { + return [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:YES]; } --(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) transferUtilityMultiPartUploadTask - subTask: (AWSS3TransferUtilityUploadSubTask *) subTask - startTransfer: (BOOL) startTransfer - internalDictionaryToAddSubTaskTo: (NSMutableDictionary *) internalDictionaryToAddSubTaskTo -{ +- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask + startTransfer:(BOOL)startTransfer { __block NSError *error = nil; //Create a temporary part file if required. if (!(subTask.file || [subTask.file isEqualToString:@""]) || ![[NSFileManager defaultManager] fileExistsAtPath:subTask.file]) { @@ -1483,10 +1441,10 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra } //Register transferUtilityMultiPartUploadTask into the taskDictionary for easy lookup in the NSURLCallback - [self->_taskDictionary setObject:transferUtilityMultiPartUploadTask forKey:@(subTask.taskIdentifier)]; + [self registerMultipartUploadTask:transferUtilityMultiPartUploadTask taskIdentifier:subTask.taskIdentifier]; //Add to required internal dictionary - [internalDictionaryToAddSubTaskTo setObject:subTask forKey:@(subTask.taskIdentifier)]; + [transferUtilityMultiPartUploadTask addUploadSubTask:subTask]; //Update Database [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID @@ -1507,39 +1465,72 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra return error; } --(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transferUtilityMultiPartUploadTask - subTask: (AWSS3TransferUtilityUploadSubTask *) subTask - startTransfer: (BOOL) startTransfer { - +- (void)completeMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask { + //Call the Multipart completion step here. + AWSTask * finishTask = [self callFinishMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; + [finishTask continueWithBlock:^id (AWSTask *task) { + if (task.error) { + AWSDDLogError(@"Error finishing up MultiPartForUpload Task[%@]", task.error); + transferUtilityMultiPartUploadTask.error = task.error; + transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError; + + //Abort the request, so the server can clean up any partials. + [self callAbortMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; + } + else { + //Set progress to 100% and call progressBlock. + AWSDDLogInfo(@"Completed Multipart Transfer: %@", transferUtilityMultiPartUploadTask.uploadID); + transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusCompleted; + + transferUtilityMultiPartUploadTask.progress.completedUnitCount = transferUtilityMultiPartUploadTask.progress.totalUnitCount; + if (transferUtilityMultiPartUploadTask.expression.progressBlock) { + transferUtilityMultiPartUploadTask.expression.progressBlock(transferUtilityMultiPartUploadTask, transferUtilityMultiPartUploadTask.progress); + } + } + + [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; + + //Call the callback function is specified. + [self completeTask:transferUtilityMultiPartUploadTask]; + return nil; + }]; +} + +- (void)retryUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask + startTransfer:(BOOL)startTransfer { //Track if the task to be retried is in the waiting or inprogress list - BOOL inWaitingPartsDictionary = NO; - - [self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)]; - if ([transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) { - [transferUtilityMultiPartUploadTask.inProgressPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)]; + BOOL inWaitingList = NO; + + [self unregisterTaskIdentifier:subTask.taskIdentifier]; + + if ([transferUtilityMultiPartUploadTask inProgressTaskForTaskIdentifier:subTask.taskIdentifier] != nil) { + // remove + [transferUtilityMultiPartUploadTask removeWaitingUploadSubTask:subTask.taskIdentifier]; transferUtilityMultiPartUploadTask.retryCount = transferUtilityMultiPartUploadTask.retryCount + 1; + } else if ([transferUtilityMultiPartUploadTask waitingTaskForTaskIdentifier:subTask.taskIdentifier] != nil) { + // remove + [transferUtilityMultiPartUploadTask removeInProgressUploadSubTask:subTask.taskIdentifier]; + inWaitingList = YES; } - else if ([transferUtilityMultiPartUploadTask.waitingPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) { - [transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)]; - inWaitingPartsDictionary = YES; - } - + //Check if the part file exists if (![[NSFileManager defaultManager] fileExistsAtPath:subTask.file]) { //Set it to nil. This will force the creatUploadSubTask to create the part from the main file subTask.file = nil; } - + NSError *subTaskCreationError; - - if (inWaitingPartsDictionary ) { - subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:startTransfer internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.waitingPartsDictionary]; - } - else { - subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:startTransfer internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.inProgressPartsDictionary]; + + if (inWaitingList) { + subTask.status = AWSS3TransferUtilityTransferStatusWaiting; + subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:startTransfer]; + } else { + subTask.status = AWSS3TransferUtilityTransferStatusInProgress; + subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:startTransfer]; } - - if ( subTaskCreationError ) { + + if (subTaskCreationError) { //cancel the multipart transfer [transferUtilityMultiPartUploadTask cancel]; transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError; @@ -1629,7 +1620,7 @@ -(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transfer transferUtilityDownloadTask.status = AWSS3TransferUtilityTransferStatusInProgress; //Create task in database - [AWSS3TransferUtilityDatabaseHelper insertDownloadTransferRequestInDB:transferUtilityDownloadTask databaseQueue:self->_databaseQueue]; + [AWSS3TransferUtilityDatabaseHelper insertDownloadTransferRequestInDB:transferUtilityDownloadTask databaseQueue:self.databaseQueue]; return [self createDownloadTask:transferUtilityDownloadTask]; } @@ -1696,9 +1687,8 @@ -(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transfer } - (void) retryDownload: (AWSS3TransferUtilityDownloadTask *) transferUtilityDownloadTask { - //Remove from taskDictionary - [self.taskDictionary removeObjectForKey:@(transferUtilityDownloadTask.sessionTask.taskIdentifier)]; + [self unregisterTaskIdentifier:transferUtilityDownloadTask.sessionTask.taskIdentifier]; AWSDDLogDebug(@"Removed object from key %@", @(transferUtilityDownloadTask.sessionTask.taskIdentifier) ); transferUtilityDownloadTask.retryCount = transferUtilityDownloadTask.retryCount + 1; @@ -1789,6 +1779,15 @@ - (void)internalEnumerateToAssignBlocksForUploadTask:(void (^)(AWSS3TransferUtil } } +- (void)registerMultipartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)multiPartUploadTask + taskIdentifier:(NSUInteger)taskIdentifier { + [self.taskDictionary setObject:multiPartUploadTask forKey:@(taskIdentifier)]; +} + +- (void)unregisterTaskIdentifier:(NSUInteger)taskIdentifier { + [self.taskDictionary removeObjectForKey:@(taskIdentifier)]; +} + - (AWSS3TransferUtilityTask *)findTransferUtilityTask:(NSURLSessionTask *)task { id obj = [self.taskDictionary objectForKey:@(task.taskIdentifier)]; if (!obj) { @@ -1877,15 +1876,77 @@ - (NSMutableArray *) getTasksHelper:(AWSSynchronizedMutableDictionary *)dictiona return tasks; } +#pragma mark - Suspend and Resume + +/// Suspends all active tasks. Downloads will be +- (void)suspendAllMultipartUploadsWithCompletionHandler:(nullable AWSS3TransferUtilityMultiPartUploadSuspendBlock)completionHandler { + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + [[self getMultiPartUploadTasks] continueWithBlock:^id (AWSTask *> * _Nonnull task) { + NSError *error = nil; + if (task.error) { + AWSDDLogError(@"Error getting MultiPartForUpload Tasks [%@]", task.error); + completionHandler(task.error); + return nil; + } else { + AWSDDLogInfo(@"Suspending MultiPartForUpload Tasks: %lu", task.result.count); + NSArray * tasks = task.result; + for (AWSS3TransferUtilityMultiPartUploadTask *multipartUploadTask in tasks) { + NSCAssert(multipartUploadTask != nil, @"Task cannot be nil"); + [multipartUploadTask suspend]; + if (multipartUploadTask.error != nil) { + error = multipartUploadTask.error; + break; + } + } + } + + if (completionHandler) { + completionHandler(error); + } + + return nil; + }]; + }); +} + +- (void)resumeAllMultipartUploadsWithCompletionHandler:(nullable AWSS3TransferUtilityMultiPartUploadResumeBlock)completionHandler { + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + [[self getMultiPartUploadTasks] continueWithBlock:^id (AWSTask *> * _Nonnull task) { + NSError *error = nil; + if (task.error) { + AWSDDLogError(@"Error getting MultiPartForUpload Tasks [%@]", task.error); + completionHandler(task.error); + return nil; + } else { + AWSDDLogInfo(@"Resuming MultiPartForUpload Tasks: %lu", task.result.count); + NSArray * tasks = task.result; + for (AWSS3TransferUtilityMultiPartUploadTask *multipartUploadTask in tasks) { + NSCAssert(multipartUploadTask != nil, @"Task cannot be nil"); + [multipartUploadTask resume]; + if (multipartUploadTask.error != nil) { + error = multipartUploadTask.error; + break; + } + } + } + + if (completionHandler) { + completionHandler(error); + } + + return nil; + }]; + }); +} + #pragma mark - Internal helper methods - (AWSTask *)callFinishMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)uploadTask { - - NSMutableArray *completedParts = [NSMutableArray arrayWithCapacity:[uploadTask.completedPartsSet count]]; + NSMutableArray *completedParts = [NSMutableArray arrayWithCapacity:uploadTask.completedTasks.count]; NSMutableDictionary *tempDictionary = [NSMutableDictionary new]; //Create a new Dictionary with the partNumber as the Key - for(AWSS3TransferUtilityUploadSubTask *subTask in uploadTask.completedPartsSet) { + for(AWSS3TransferUtilityUploadSubTask *subTask in uploadTask.completedTasks) { [tempDictionary setObject:subTask forKey:subTask.partNumber]; } @@ -1910,7 +1971,7 @@ - (AWSTask *)callFinishMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUplo return [self.s3 completeMultipartUpload:compReq]; } -- (AWSTask *) callAbortMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *) uploadTask { +- (AWSTask *)callAbortMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)uploadTask { AWSS3AbortMultipartUploadRequest *abortReq = [AWSS3AbortMultipartUploadRequest new]; abortReq.bucket = uploadTask.bucket; abortReq.uploadId = uploadTask.uploadID; @@ -2062,7 +2123,7 @@ - (void)URLSession:(NSURLSession *)session } //Mark status as completed if there is no error. - if (! uploadTask.error ) { + if (!uploadTask.error) { uploadTask.status = AWSS3TransferUtilityTransferStatusCompleted; //Set progress to 100% and call the progress block uploadTask.progress.completedUnitCount = uploadTask.progress.totalUnitCount; @@ -2089,20 +2150,21 @@ - (void)URLSession:(NSURLSession *)session return; } - AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask.waitingPartsDictionary objectForKey:@(task.taskIdentifier)]; + AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask waitingTaskForTaskIdentifier:task.taskIdentifier]; if (subTask) { - //Add it to inProgress list - [transferUtilityMultiPartUploadTask.inProgressPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)]; - - //Remove it from the waitingList - [transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)]; + [transferUtilityMultiPartUploadTask moveWaitingTaskToInProgress:subTask]; } - subTask = [transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:@(task.taskIdentifier)]; + subTask = [transferUtilityMultiPartUploadTask inProgressTaskForTaskIdentifier:task.taskIdentifier]; if (!subTask) { AWSDDLogDebug(@"Unable to find information for task %lu in inProgress Dictionary", (unsigned long)task.taskIdentifier); return; } + + if (subTask.status == AWSS3TransferUtilityTransferStatusPaused) { + AWSDDLogDebug(@"Subtask is paused: %lu", (unsigned long)task.taskIdentifier); + return; + } //Check if the task was cancelled. if (transferUtilityMultiPartUploadTask.cancelled) { @@ -2116,155 +2178,18 @@ - (void)URLSession:(NSURLSession *)session [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; return; } - + //Check if there was an error. if (error) { - - //Retrying if a 500, 503 or 400 RequestTimeout error occured. - if ([self isErrorRetriable:HTTPResponse.statusCode responseFromServer:subTask.responseData]) { - AWSDDLogDebug(@"Received a 500, 503 or 400 error. Response Data is [%@]", subTask.responseData); - if (transferUtilityMultiPartUploadTask.retryCount < self.transferUtilityConfiguration.retryLimit) { - AWSDDLogDebug(@"Retry count is below limit and error is retriable. "); - [self retryUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:YES]; - return; - } - } - - if(subTask.responseData != nil && [subTask.responseData isEqualToString:@""]) { - // Transfer's multi-part subtask does not have raw data access, so only check string based response data. - [self extractErrorInformation: [subTask responseData] - userInfo: userInfo]; - } - NSError *updatedError = [[NSError alloc] initWithDomain:error.domain code:error.code userInfo:userInfo]; - - //Error is not retriable. - transferUtilityMultiPartUploadTask.error = updatedError; - transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError; - - //Execute call back if provided. - [self completeTask:transferUtilityMultiPartUploadTask]; - - //Make sure all other parts that are in progress are canceled. - for (NSNumber *key in [transferUtilityMultiPartUploadTask.inProgressPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:key]; - [subTask.sessionTask cancel]; - } - - for (NSNumber *key in [transferUtilityMultiPartUploadTask.waitingPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask.waitingPartsDictionary objectForKey:key]; - [subTask.sessionTask cancel]; - } - - //Abort the request, so the server can clean up any partials. - [self callAbortMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; - - //clean up. - [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; - return; - } - - NSHTTPURLResponse *HTTPResponse = (NSHTTPURLResponse *) task.response; - subTask.eTag = (NSString *) HTTPResponse.allHeaderFields[@"ETAG"]; - - //Add it to completed parts and remove it from remaining parts. - [transferUtilityMultiPartUploadTask.completedPartsSet addObject:subTask]; - [transferUtilityMultiPartUploadTask.inProgressPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)]; - //Update progress - transferUtilityMultiPartUploadTask.progress.completedUnitCount = transferUtilityMultiPartUploadTask.progress.completedUnitCount - subTask.totalBytesSent + subTask.totalBytesExpectedToSend; - - //Delete the temporary upload file for this subTask - [self removeFile:subTask.file]; - subTask.status = AWSS3TransferUtilityTransferStatusCompleted; - - //Update Database - [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID - partNumber:subTask.partNumber - taskIdentifier:subTask.taskIdentifier - eTag:subTask.eTag - status:subTask.status - retry_count:transferUtilityMultiPartUploadTask.retryCount databaseQueue:self.databaseQueue]; - - //If there are parts waiting to be uploaded, pick from the waiting parts list and move it to inProgress - if ([transferUtilityMultiPartUploadTask.waitingPartsDictionary count] > 0) { - long numberOfPartsInProgress = [transferUtilityMultiPartUploadTask.inProgressPartsDictionary count]; - while (numberOfPartsInProgress < [self.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue]) { - if ([transferUtilityMultiPartUploadTask.waitingPartsDictionary count] > 0) { - //Get a part from the waitingList - AWSS3TransferUtilityUploadSubTask *nextSubTask = [[transferUtilityMultiPartUploadTask.waitingPartsDictionary allValues] objectAtIndex:0]; - - //Add to inProgress list - [transferUtilityMultiPartUploadTask.inProgressPartsDictionary setObject:nextSubTask forKey:@(nextSubTask.taskIdentifier)]; - - //Remove it from the waitingList - [transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)]; - AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), transferUtilityMultiPartUploadTask.uploadID); - [nextSubTask.sessionTask resume]; - numberOfPartsInProgress++; - continue; - } - break; - } - } - else if ([transferUtilityMultiPartUploadTask.inProgressPartsDictionary count] == 0) { - //If there are no more inProgress parts, then we are done. - - //Validate that all the content has been uploaded. - int64_t totalBytesSent = 0; - for (AWSS3TransferUtilityUploadSubTask *aSubTask in transferUtilityMultiPartUploadTask.completedPartsSet) { - totalBytesSent += aSubTask.totalBytesExpectedToSend; - } - - if (totalBytesSent != transferUtilityMultiPartUploadTask.contentLength.longLongValue ) { - NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ", - transferUtilityMultiPartUploadTask.contentLength, @(totalBytesSent)]; - AWSDDLogDebug(@"%@", errorMessage); - NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage - forKey:@"Message"]; - - transferUtilityMultiPartUploadTask.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain - code:AWSS3TransferUtilityErrorClientError - userInfo:userInfo]; - - //Execute call back if provided. - [self completeTask:transferUtilityMultiPartUploadTask]; - - //Abort the request, so the server can clean up any partials. - [self callAbortMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; - - //clean up. - [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; - return; - } - - - //Call the Multipart completion step here. - [[ self callFinishMultiPartForUploadTask:transferUtilityMultiPartUploadTask] continueWithBlock:^id (AWSTask *task) { - if (task.error) { - AWSDDLogError(@"Error finishing up MultiPartForUpload Task[%@]", task.error); - transferUtilityMultiPartUploadTask.error = error; - transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError; - - //Abort the request, so the server can clean up any partials. - [self callAbortMultiPartForUploadTask:transferUtilityMultiPartUploadTask]; - - } - else { - //Set progress to 100% and call progressBlock. - AWSDDLogInfo(@"Completed Multipart Transfer: %@", transferUtilityMultiPartUploadTask.uploadID); - transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusCompleted; - - transferUtilityMultiPartUploadTask.progress.completedUnitCount = transferUtilityMultiPartUploadTask.progress.totalUnitCount; - if (transferUtilityMultiPartUploadTask.expression.progressBlock ) { - transferUtilityMultiPartUploadTask.expression.progressBlock(transferUtilityMultiPartUploadTask, transferUtilityMultiPartUploadTask.progress); - } - } - - [self cleanupForMultiPartUploadTask:transferUtilityMultiPartUploadTask]; - - //Call the callback function is specified. - [self completeTask:transferUtilityMultiPartUploadTask]; - return nil; - }]; + [self processMultipartUploadTaskError:error + usingHTTPResponse:HTTPResponse + andUserInfo:userInfo + forSubTask:subTask + withMultipartUploadTask:transferUtilityMultiPartUploadTask]; + } else { + [transferUtilityMultiPartUploadTask completeUploadSubTask:subTask usingHTTPResponse:HTTPResponse]; + [transferUtilityMultiPartUploadTask moveWaitingTasksToInProgress:YES]; + [transferUtilityMultiPartUploadTask completeIfDone]; } } } @@ -2285,23 +2210,23 @@ - (void)URLSession:(NSURLSession *)session //Check if the task was cancelled. if (downloadTask.cancelled) { [self.completedTaskDictionary setObject:downloadTask forKey:downloadTask.transferID]; - [self.taskDictionary removeObjectForKey:@(downloadTask.sessionTask.taskIdentifier)]; + [self unregisterTaskIdentifier:downloadTask.sessionTask.taskIdentifier]; [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID databaseQueue:_databaseQueue]; return; } - + //Make sure to not overwrite if an error has already been set on the downloadTask if (!downloadTask.error) { downloadTask.error = error; } - - if(!downloadTask.error ) { + + if (!downloadTask.error) { downloadTask.status = AWSS3TransferUtilityTransferStatusCompleted; } else { downloadTask.status = AWSS3TransferUtilityTransferStatusError; } - + if (downloadTask.error && HTTPResponse) { if ([self isErrorRetriable:HTTPResponse.statusCode responseFromServer:downloadTask.responseData]) { if (downloadTask.retryCount < self.transferUtilityConfiguration.retryLimit) { @@ -2321,7 +2246,7 @@ - (void)URLSession:(NSURLSession *)session NSError *updatedError = [[NSError alloc] initWithDomain:downloadTask.error.domain code:downloadTask.error.code userInfo:userInfo]; downloadTask.error = updatedError; } - + if (!downloadTask.error) { downloadTask.progress.completedUnitCount = downloadTask.progress.totalUnitCount; if (downloadTask.expression.progressBlock) { @@ -2329,12 +2254,57 @@ - (void)URLSession:(NSURLSession *)session } } [self.completedTaskDictionary setObject:downloadTask forKey:downloadTask.transferID]; - [self.taskDictionary removeObjectForKey:@(downloadTask.sessionTask.taskIdentifier)]; + [self unregisterTaskIdentifier:downloadTask.sessionTask.taskIdentifier]; [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:downloadTask.transferID databaseQueue:_databaseQueue]; [self completeTask:downloadTask]; } } +- (void)processMultipartUploadTaskError:(NSError *)error + usingHTTPResponse:(NSHTTPURLResponse *)HTTPResponse + andUserInfo:(NSMutableDictionary *)userInfo + forSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask + withMultipartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)multipartUploadTask { + //Retrying if a 500, 503 or 400 RequestTimeout error occured. + if ([self isErrorRetriable:HTTPResponse.statusCode responseFromServer:subTask.responseData]) { + AWSDDLogDebug(@"Received a 500, 503 or 400 error. Response Data is [%@]", subTask.responseData); + if (multipartUploadTask.retryCount < self.transferUtilityConfiguration.retryLimit) { + AWSDDLogDebug(@"Retry count is below limit and error is retriable. "); + [self retryUploadSubTask:multipartUploadTask subTask:subTask startTransfer:YES]; + return; + } + } + + if (subTask.responseData != nil && [subTask.responseData isEqualToString:@""]) { + // Transfer's multi-part subtask does not have raw data access, so only check string based response data. + [self extractErrorInformation: [subTask responseData] + userInfo: userInfo]; + } + NSError *updatedError = [[NSError alloc] initWithDomain:error.domain code:error.code userInfo:userInfo]; + + //Error is not retriable. + multipartUploadTask.error = updatedError; + multipartUploadTask.status = AWSS3TransferUtilityTransferStatusError; + + //Execute call back if provided. + [self completeTask:multipartUploadTask]; + + //Make sure all other parts that are in progress are canceled. + for (AWSS3TransferUtilityUploadSubTask *subTask in multipartUploadTask.inProgressTasks) { + [subTask.sessionTask cancel]; + } + + for (AWSS3TransferUtilityUploadSubTask *subTask in multipartUploadTask.waitingTasks) { + [subTask.sessionTask cancel]; + } + + //Abort the request, so the server can clean up any partials. + [self callAbortMultiPartForUploadTask:multipartUploadTask]; + + //clean up. + [self cleanupForMultiPartUploadTask:multipartUploadTask]; +} + - (void)URLSession:(NSURLSession *)session task:(NSURLSessionTask *)task didSendBodyData:(int64_t)bytesSent @@ -2361,33 +2331,31 @@ - (void)URLSession:(NSURLSession *)session transferUtilityUploadTask.expression.progressBlock(transferUtilityUploadTask, transferUtilityUploadTask.progress); } } - } - else if ([transferUtilityTask isKindOfClass:[AWSS3TransferUtilityMultiPartUploadTask class]]) { + } else if ([transferUtilityTask isKindOfClass:[AWSS3TransferUtilityMultiPartUploadTask class]]) { //Get the multipart upload task AWSS3TransferUtilityMultiPartUploadTask *transferUtilityMultiPartUploadTask = [self.taskDictionary objectForKey:@(task.taskIdentifier)]; //Get multipart upload sub task - AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:@(task.taskIdentifier)]; + AWSS3TransferUtilityUploadSubTask *subTask = [transferUtilityMultiPartUploadTask inProgressTaskForTaskIdentifier:task.taskIdentifier]; subTask.totalBytesSent = totalBytesSent; - - + //Calculate the total sent so far int64_t totalSentSoFar = 0; //Create a new Dictionary with the partNumber as the Key NSMutableDictionary *completedPartsByPartNumber = [NSMutableDictionary new]; - for(AWSS3TransferUtilityUploadSubTask *subTask in transferUtilityMultiPartUploadTask.completedPartsSet) { + for (AWSS3TransferUtilityUploadSubTask *subTask in transferUtilityMultiPartUploadTask.completedTasks) { [completedPartsByPartNumber setObject:subTask forKey:subTask.partNumber]; } - for (AWSS3TransferUtilityUploadSubTask *aSubTask in [completedPartsByPartNumber allValues]) { + for (AWSS3TransferUtilityUploadSubTask *aSubTask in completedPartsByPartNumber.allValues) { totalSentSoFar += aSubTask.totalBytesExpectedToSend; } - for (AWSS3TransferUtilityUploadSubTask *aSubTask in [transferUtilityMultiPartUploadTask.inProgressPartsDictionary allValues]) { + for (AWSS3TransferUtilityUploadSubTask *aSubTask in transferUtilityMultiPartUploadTask.inProgressTasks) { totalSentSoFar += aSubTask.totalBytesSent; } - - if (transferUtilityMultiPartUploadTask.progress.completedUnitCount != totalSentSoFar ) { + + if (transferUtilityMultiPartUploadTask.progress.completedUnitCount != totalSentSoFar) { transferUtilityMultiPartUploadTask.progress.totalUnitCount = [transferUtilityMultiPartUploadTask.contentLength longLongValue]; transferUtilityMultiPartUploadTask.progress.completedUnitCount = totalSentSoFar; - + //execute the callback to the progressblock if present. if (transferUtilityMultiPartUploadTask.expression.progressBlock) { AWSDDLogDebug(@"Total %lld, ProgressSoFar %lld", transferUtilityMultiPartUploadTask.progress.totalUnitCount, transferUtilityMultiPartUploadTask.progress.completedUnitCount); @@ -2409,27 +2377,25 @@ - (void)completeTask:(AWSS3TransferUtilityTask *)task removeCompletedTask:(BOOL) if (removeCompletedTask) { // complete task before removing from dictionaries [self.completedTaskDictionary removeObjectForKey:task.transferID]; - [self.taskDictionary removeObjectForKey:@(task.taskIdentifier)]; + [self unregisterTaskIdentifier:task.taskIdentifier]; } - } -- (void) cleanupForMultiPartUploadTask: (AWSS3TransferUtilityMultiPartUploadTask *) task { - +- (void)cleanupForMultiPartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)task { //Add it to list of completed Tasks [self.completedTaskDictionary setObject:task forKey:task.transferID]; - + //Remove all entries from taskDictionary. - for ( AWSS3TransferUtilityUploadSubTask *subTask in [task.inProgressPartsDictionary allValues] ) { - [self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)]; + for (AWSS3TransferUtilityUploadSubTask *subTask in task.inProgressTasks) { + [self unregisterTaskIdentifier:subTask.taskIdentifier]; [self removeFile:subTask.file]; } - + //Remove temporary file if required. if (task.temporaryFileCreated) { [self removeFile:task.file]; } - + //Remove data from the Database. [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:task.transferID databaseQueue:_databaseQueue]; } @@ -2439,7 +2405,7 @@ - (void) cleanupForUploadTask: (AWSS3TransferUtilityUploadTask *) uploadTask { [self.completedTaskDictionary setObject:uploadTask forKey:uploadTask.transferID]; //Remove entry from taskDictionary - [self.taskDictionary removeObjectForKey:@(uploadTask.taskIdentifier)]; + [self unregisterTaskIdentifier:uploadTask.taskIdentifier]; //Remove temporary file if required. if (uploadTask.temporaryFileCreated) { @@ -2492,8 +2458,7 @@ - (void)extractErrorInformation:(NSString *)responseString } } -- (void) removeFile: (NSString *) absolutePath -{ +- (void)removeFile:(NSString *)absolutePath { if (!absolutePath || ![[NSFileManager defaultManager ] fileExistsAtPath:absolutePath]) { return; } diff --git a/AWSS3/AWSS3TransferUtilityDatabaseHelper.m b/AWSS3/AWSS3TransferUtilityDatabaseHelper.m index 76e80a64226..fcdc27aa954 100644 --- a/AWSS3/AWSS3TransferUtilityDatabaseHelper.m +++ b/AWSS3/AWSS3TransferUtilityDatabaseHelper.m @@ -127,6 +127,9 @@ + (void) updateTransferRequestInDB: (NSString *) transferID status: (AWSS3TransferUtilityTransferStatusType) status retry_count: (NSUInteger) retryCount databaseQueue: (AWSFMDatabaseQueue *) databaseQueue { + if (eTag == nil) { + eTag = @""; + } NSString *const AWSS3TransferUtilityUpdateTransferUtilityStatusAndETag = @"UPDATE awstransfer " @"SET status=:status, etag = :etag, session_task_id = :session_task_id, retry_count = :retry_count " @"WHERE transfer_id=:transfer_id and " diff --git a/AWSS3/AWSS3TransferUtilityTasks.h b/AWSS3/AWSS3TransferUtilityTasks.h index 0140e3acbd8..d9d51de34b2 100644 --- a/AWSS3/AWSS3TransferUtilityTasks.h +++ b/AWSS3/AWSS3TransferUtilityTasks.h @@ -87,6 +87,20 @@ typedef void (^AWSS3TransferUtilityMultiPartProgressBlock) (AWSS3TransferUtility NSProgress *progress); +/** + The suspend multipart upload completion handler. + + @param error Returns the error object when the suspending failed. Returns `nil` on success. + */ +typedef void (^AWSS3TransferUtilityMultiPartUploadSuspendBlock) (NSError * _Nullable error); + +/** + The resume multipart upload completion handler. + + @param error Returns the error object when the resume failed. Returns `nil` on success. + */ +typedef void (^AWSS3TransferUtilityMultiPartUploadResumeBlock) (NSError * _Nullable error); + #pragma mark - AWSS3TransferUtilityTasks /** @@ -140,6 +154,11 @@ typedef void (^AWSS3TransferUtilityMultiPartProgressBlock) (AWSS3TransferUtility */ @property (nullable, readonly) NSHTTPURLResponse *response; +/** + Error after operation. + */ +@property (nullable, readonly) NSError *error; + /** Cancels the task. */ diff --git a/AWSS3/AWSS3TransferUtilityTasks.m b/AWSS3/AWSS3TransferUtilityTasks.m index 0e91e5dfb44..77486c117d1 100644 --- a/AWSS3/AWSS3TransferUtilityTasks.m +++ b/AWSS3/AWSS3TransferUtilityTasks.m @@ -19,6 +19,7 @@ #import "AWSS3PreSignedURL.h" #import +#import "AWSS3TransferUtility.h" #import "AWSS3TransferUtilityTasks+Completion.h" #import "AWSS3TransferUtility_private.h" @@ -64,7 +65,7 @@ - (void)suspend { //Pause called on a transfer that is not in progress. No op. return; } - + [self.sessionTask suspend]; self.status = AWSS3TransferUtilityTransferStatusPaused; [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID @@ -128,10 +129,37 @@ - (instancetype)init { _waitingPartsDictionary = [NSMutableDictionary new]; _inProgressPartsDictionary = [NSMutableDictionary new]; _completedPartsSet = [NSMutableSet new]; + _serialQueue = dispatch_queue_create("com.amazonaws.AWSS3.MultipartUploadTask", DISPATCH_QUEUE_SERIAL); } return self; } +- (BOOL)isUnderConcurrencyLimit { + NSUInteger dynamicLimit = NSProcessInfo.processInfo.activeProcessorCount * 2; + NSUInteger configuredLimit = self.transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit.integerValue; + return self.inProgressPartsDictionary.count < MAX(dynamicLimit, configuredLimit); +} + +- (BOOL)hasWaitingTasks { + return self.waitingTasks.count > 0; +} + +- (BOOL)isDone { + return _waitingPartsDictionary.count == 0 && _inProgressPartsDictionary.count == 0; +} + +- (NSArray *)waitingTasks { + return self.waitingPartsDictionary.allValues; +} + +- (NSArray *)inProgressTasks { + return self.inProgressPartsDictionary.allValues; +} + +- (NSArray *)completedTasks { + return self.completedPartsSet.allObjects; +} + - (AWSS3TransferUtilityMultiPartUploadExpression *)expression { if (!_expression) { _expression = [AWSS3TransferUtilityMultiPartUploadExpression new]; @@ -142,13 +170,11 @@ - (AWSS3TransferUtilityMultiPartUploadExpression *)expression { - (void)cancel { self.cancelled = YES; self.status = AWSS3TransferUtilityTransferStatusCancelled; - for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; + for (AWSS3TransferUtilityUploadSubTask *subTask in self.inProgressTasks) { [subTask.sessionTask cancel]; } - for (NSNumber *key in [self.waitingPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.waitingPartsDictionary objectForKey:key]; + for (AWSS3TransferUtilityUploadSubTask *subTask in self.waitingTasks) { [subTask.sessionTask cancel]; } @@ -156,32 +182,22 @@ - (void)cancel { } - (void)resume { - if (self.status != AWSS3TransferUtilityTransferStatusPaused ) { + // no parts should be paused. instead all in progress part uploads should be canceled and + // replaced with a new subtask which has not been started so that resuming will instead + // start up to the number up to the concurrency limit. + if (self.status != AWSS3TransferUtilityTransferStatusPaused) { //Resume called on a transfer that hasn't been paused. No op. return; } - - for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; - subTask.status = AWSS3TransferUtilityTransferStatusInProgress; - [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID - partNumber:subTask.partNumber - taskIdentifier:subTask.taskIdentifier - eTag:subTask.eTag - status:subTask.status - retry_count:self.retryCount - databaseQueue:self.databaseQueue]; - [subTask.sessionTask resume]; + + NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided."); + + // Change status from paused to waiting + for (AWSS3TransferUtilityUploadSubTask * nextSubTask in self.waitingTasks) { + nextSubTask.status = AWSS3TransferUtilityTransferStatusWaiting; } - self.status = AWSS3TransferUtilityTransferStatusInProgress; - //Update the Master Record - [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID - partNumber:@0 - taskIdentifier:0 - eTag:@"" - status:self.status - retry_count:self.retryCount - databaseQueue:self.databaseQueue]; + + [self moveWaitingTasksToInProgress:YES]; } - (void)suspend { @@ -189,20 +205,48 @@ - (void)suspend { //Pause called on a transfer that is not in progresss. No op. return; } - - for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; - [subTask.sessionTask suspend]; + + NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided."); + + // Cancel session task for all subtasks which are in progress and set status to paused + for (AWSS3TransferUtilityUploadSubTask *inProgressSubTask in self.inProgressTasks) { + // Note: This can happen due to lack of thread-safety + if (!inProgressSubTask) { + NSCAssert(NO, @"Sub Task should not be nil!"); + continue; + } + + // cancel the URLSessionTask + inProgressSubTask.status = AWSS3TransferUtilityTransferStatusPaused; + [inProgressSubTask.sessionTask cancel]; + + AWSS3TransferUtilityUploadSubTask *subTask = [AWSS3TransferUtilityUploadSubTask new]; + subTask.transferID = self.transferID; + subTask.partNumber = inProgressSubTask.partNumber; + subTask.transferType = @"MULTI_PART_UPLOAD_SUB_TASK"; + subTask.totalBytesExpectedToSend = inProgressSubTask.totalBytesExpectedToSend; + subTask.totalBytesSent = (long long) 0; + subTask.responseData = @""; + subTask.file = @""; + subTask.eTag = @""; subTask.status = AWSS3TransferUtilityTransferStatusPaused; - - [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID - partNumber:subTask.partNumber - taskIdentifier:subTask.taskIdentifier - eTag:subTask.eTag - status:subTask.status - retry_count:self.retryCount - databaseQueue:self.databaseQueue]; + + NSError *error = [self.transferUtility createUploadSubTask:self + subTask:subTask + startTransfer:NO]; + + if (error) { + AWSDDLogError(@"Error creating AWSS3TransferUtilityUploadSubTask [%@]", error); + self.error = error; + } else { + self.inProgressPartsDictionary[@(inProgressSubTask.taskIdentifier)] = nil; + + [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:inProgressSubTask.transferID partNumber:inProgressSubTask.partNumber taskIdentifier:inProgressSubTask.taskIdentifier eTag:nil status:AWSS3TransferUtilityTransferStatusCancelled retry_count:0 databaseQueue:self.databaseQueue]; + + [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB:self subTask:subTask databaseQueue:self.databaseQueue]; + } } + self.status = AWSS3TransferUtilityTransferStatusPaused; //Update the Master Record [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID @@ -214,8 +258,174 @@ - (void)suspend { databaseQueue:self.databaseQueue]; } --(void) setCompletionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock)completionHandler { - +- (void)addUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask { + if (subTask.status == AWSS3TransferUtilityTransferStatusUnknown || + subTask.status == AWSS3TransferUtilityTransferStatusPaused || + subTask.status == AWSS3TransferUtilityTransferStatusWaiting) { + self.waitingPartsDictionary[@(subTask.taskIdentifier)] = subTask; + } else if (subTask.status == AWSS3TransferUtilityTransferStatusInProgress) { + self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask; + } else if (subTask.status == AWSS3TransferUtilityTransferStatusCompleted) { + [self.completedPartsSet addObject:subTask]; + } else { + AWSDDLogDebug(@"Sub Task status not supported: %lu", subTask.status); + NSCAssert(NO, @"Status not supported"); + } + + [self completeIfDone]; +} + +- (void)removeWaitingUploadSubTask:(NSUInteger)taskIdentifier { + self.waitingPartsDictionary[@(taskIdentifier)] = nil; +} + +- (void)removeInProgressUploadSubTask:(NSUInteger)taskIdentifier { + self.inProgressPartsDictionary[@(taskIdentifier)] = nil; +} + +- (AWSS3TransferUtilityUploadSubTask *)waitingTaskForTaskIdentifier:(NSUInteger)taskIdentifier { + return self.waitingPartsDictionary[@(taskIdentifier)]; +} + +- (AWSS3TransferUtilityUploadSubTask *)inProgressTaskForTaskIdentifier:(NSUInteger)taskIdentifier { + return self.inProgressPartsDictionary[@(taskIdentifier)]; +} + +- (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask { + [self moveWaitingTaskToInProgress:subTask startTransfer:NO]; +} + +- (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask startTransfer:(BOOL)startTransfer { + if ([self.waitingTasks containsObject:subTask]) { + // Add to inProgress list + self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask; + // Remove it from the waitingList + self.waitingPartsDictionary[@(subTask.taskIdentifier)] = nil; + AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(subTask.taskIdentifier), self.uploadID); + + if (startTransfer) { + AWSDDLogDebug(@"Starting subTask %@", @(subTask.taskIdentifier)); + NSCAssert(subTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming."); + [subTask.sessionTask resume]; + } + } +} + +- (void)moveInProgressAndSuspendedTasks { + // move suspended tasks from in progress to waiting to allow multipart upload process to run properly + NSMutableArray *inProgressAndSuspendedTasks = @[].mutableCopy; + + for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.inProgressTasks) { + if (aSubTask.sessionTask.state == NSURLSessionTaskStateSuspended) { + AWSDDLogDebug(@"Subtask for multipart upload is suspended: %ld", aSubTask.taskIdentifier); + [inProgressAndSuspendedTasks addObject:aSubTask]; + } + } + + for (AWSS3TransferUtilityUploadSubTask *aSubTask in inProgressAndSuspendedTasks) { + self.inProgressPartsDictionary[@(aSubTask.taskIdentifier)] = nil; + self.waitingPartsDictionary[@(aSubTask.taskIdentifier)] = aSubTask; + } +} + +- (void)moveWaitingTasksToInProgress { + [self moveWaitingTasksToInProgress:NO]; +} + +- (void)moveWaitingTasksToInProgress:(BOOL)startTransfer { + // move parts from waiting to in progress if under the concurrency limit + while (self.isUnderConcurrencyLimit && self.hasWaitingTasks) { + //Get a part from the waitingList + AWSS3TransferUtilityUploadSubTask *nextSubTask = [self.waitingTasks objectAtIndex:0]; + + //Add to inProgress list + self.inProgressPartsDictionary[@(nextSubTask.taskIdentifier)] = nextSubTask; + nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress; + + //Remove it from the waitingList + self.waitingPartsDictionary[@(nextSubTask.taskIdentifier)] = nil; + AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), self.uploadID); + + if (startTransfer) { + AWSDDLogDebug(@"Starting subTask %@", @(nextSubTask.taskIdentifier)); + NSCAssert(nextSubTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming."); + [nextSubTask.sessionTask resume]; + } + } + + [self completeIfDone]; +} + +- (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask + usingHTTPResponse:(NSHTTPURLResponse *)HTTPResponse { + subTask.eTag = (NSString *)HTTPResponse.allHeaderFields[@"ETAG"]; + + //Add it to completed parts and remove it from remaining parts. + [self.completedPartsSet addObject:subTask]; + self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = nil; + //Update progress + self.progress.completedUnitCount = self.progress.completedUnitCount - subTask.totalBytesSent + subTask.totalBytesExpectedToSend; + + //Delete the temporary upload file for this subTask + [self.transferUtility removeFile:subTask.file]; + subTask.status = AWSS3TransferUtilityTransferStatusCompleted; + + //Update Database + [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID + partNumber:subTask.partNumber + taskIdentifier:subTask.taskIdentifier + eTag:subTask.eTag + status:subTask.status + retry_count:self.retryCount databaseQueue:self.databaseQueue]; +} + +- (void)completeIfDone { + dispatch_async(self.serialQueue, ^{ + // Complete multipart upload if in progress and waiting tasks are done + if (!self.isDone && self.status != AWSS3TransferUtilityTransferStatusCompleted) { + return; + } + + //If there are no more inProgress parts, then we are done. + + //Validate that all the content has been uploaded. + int64_t totalBytesSent = 0; + for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedTasks) { + totalBytesSent += aSubTask.totalBytesExpectedToSend; + } + + if (totalBytesSent != self.contentLength.longLongValue ) { + NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ", + self.contentLength, @(totalBytesSent)]; + AWSDDLogDebug(@"%@", errorMessage); + NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage + forKey:@"Message"]; + + self.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain + code:AWSS3TransferUtilityErrorClientError + userInfo:userInfo]; + + //Execute call back if provided. + [self.transferUtility completeTask:self]; + + //Abort the request, so the server can clean up any partials. + [self.transferUtility callAbortMultiPartForUploadTask:self]; + + //clean up. + [self.transferUtility cleanupForMultiPartUploadTask:self]; + return; + } + + AWSDDLogDebug(@"There are %lu waiting upload parts.", (unsigned long)self.waitingTasks.count); + AWSDDLogDebug(@"There are %lu in progress upload parts.", (unsigned long)self.inProgressTasks.count); + AWSDDLogDebug(@"There are %lu completed upload parts.", (unsigned long)self.completedTasks.count); + [self.transferUtility completeMultiPartForUploadTask:self]; + self.status = AWSS3TransferUtilityTransferStatusCompleted; + }); + +} + +- (void)setCompletionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock)completionHandler { self.expression.completionHandler = completionHandler; //If the task has already completed successfully //Or the task has completed with error, complete the task @@ -224,10 +434,14 @@ -(void) setCompletionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandl } } --(void) setProgressBlock:(AWSS3TransferUtilityMultiPartProgressBlock)progressBlock { +- (void)setProgressBlock:(AWSS3TransferUtilityMultiPartProgressBlock)progressBlock { self.expression.progressBlock = progressBlock; } +- (void)integrateWithTransferUtility:(AWSS3TransferUtility *)transferUtility { + self.transferUtility = transferUtility; +} + @end @implementation AWSS3TransferUtilityDownloadTask @@ -239,15 +453,14 @@ - (AWSS3TransferUtilityDownloadExpression *)expression { return _expression; } --(void) cancel { +- (void)cancel { self.cancelled = YES; self.status = AWSS3TransferUtilityTransferStatusCancelled; [self.sessionTask cancel]; [AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:self.transferID databaseQueue:self.databaseQueue]; } --(void) setCompletionHandler:(AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler { - +- (void)setCompletionHandler:(AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler { self.expression.completionHandler = completionHandler; //If the task has already completed successfully //Or the task has completed with error, complete the task @@ -256,7 +469,7 @@ -(void) setCompletionHandler:(AWSS3TransferUtilityDownloadCompletionHandlerBlock } } --(void) setProgressBlock:(AWSS3TransferUtilityProgressBlock)progressBlock { +- (void)setProgressBlock:(AWSS3TransferUtilityProgressBlock)progressBlock { self.expression.progressBlock = progressBlock; } diff --git a/AWSS3/AWSS3TransferUtility_private.h b/AWSS3/AWSS3TransferUtility_private.h index 1918cabdb3f..a65e83b47b6 100644 --- a/AWSS3/AWSS3TransferUtility_private.h +++ b/AWSS3/AWSS3TransferUtility_private.h @@ -17,16 +17,36 @@ #import "AWSS3Service.h" #import "AWSS3PreSignedURL.h" +@class AWSS3TransferUtilityConfiguration; +@class AWSS3PreSignedURLBuilder; + +@interface AWSS3TransferUtility () + +- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask; + +- (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask + startTransfer:(BOOL)startTransfer; + +- (void)completeTask:(AWSS3TransferUtilityTask *)task; +- (AWSTask *)callAbortMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)uploadTask; +- (void)cleanupForMultiPartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)task; +- (void)completeMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask; +- (void)removeFile:(NSString *)absolutePath; + +@end + @interface AWSS3TransferUtilityTask() @property (strong, nonatomic) NSURLSessionTask *sessionTask; -@property (readwrite) NSUInteger taskIdentifier; +@property (nonatomic, readwrite) NSUInteger taskIdentifier; @property (strong, nonatomic) NSString *transferID; @property (strong, nonatomic) NSString *bucket; @property (strong, nonatomic) NSString *key; @property (strong, nonatomic) NSData *data; @property (strong, nonatomic) NSURL *location; -@property (strong, nonatomic) NSError *error; +@property (readwrite, nonatomic) NSError *error; @property int retryCount; @property (copy) NSString *nsURLSessionID; @property (copy) NSString *file; @@ -53,12 +73,37 @@ @property (copy) NSString * uploadID; @property BOOL cancelled; @property BOOL temporaryFileCreated; -@property NSMutableDictionary *waitingPartsDictionary; -@property (strong, nonatomic) NSMutableSet *completedPartsSet; +@property (readonly) BOOL isUnderConcurrencyLimit; +@property (readonly) BOOL hasWaitingTasks; +@property (readonly) BOOL isDone; +@property (strong, nonatomic) NSMutableDictionary *waitingPartsDictionary; @property (strong, nonatomic) NSMutableDictionary *inProgressPartsDictionary; +@property (strong, nonatomic) NSMutableSet *completedPartsSet; +@property (strong, nonatomic) dispatch_queue_t serialQueue; @property int partNumber; @property NSNumber *contentLength; +@property (readonly) NSArray * waitingTasks; +@property (readonly) NSArray * inProgressTasks; +@property (readonly) NSArray * completedTasks; + +@property (weak, nonatomic) AWSS3TransferUtility *transferUtility; + +- (void)integrateWithTransferUtility:(AWSS3TransferUtility *)transferUtility; +- (void)addUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask; +- (void)removeWaitingUploadSubTask:(NSUInteger)taskIdentifier; +- (void)removeInProgressUploadSubTask:(NSUInteger)taskIdentifier; +- (AWSS3TransferUtilityUploadSubTask *)waitingTaskForTaskIdentifier:(NSUInteger)taskIdentifier; +- (AWSS3TransferUtilityUploadSubTask *)inProgressTaskForTaskIdentifier:(NSUInteger)taskIdentifier; +- (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask; +- (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask startTransfer:(BOOL)startTransfer; +- (void)moveInProgressAndSuspendedTasks; +- (void)moveWaitingTasksToInProgress; +- (void)moveWaitingTasksToInProgress:(BOOL)startTransfer; +- (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask + usingHTTPResponse:(NSHTTPURLResponse *)HTTPResponse; +- (void)completeIfDone; + @end @interface AWSS3TransferUtilityDownloadTask() diff --git a/CHANGELOG.md b/CHANGELOG.md index d3168476736..7193bba666e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ ## 2.27.10 -Features for next release +### New features + +- **AWSS3** + + - feat: implements suspend and resume for all multipart uploads (See [PR #4168](https://github.com/aws-amplify/aws-sdk-ios/pull/4168)) + ### Bug Fixes - **AWSCognito**