|
1 | 1 | #nullable enable
|
2 | 2 | using System;
|
3 | 3 | using System.Collections.Generic;
|
| 4 | +using System.Diagnostics; |
4 | 5 | using System.Diagnostics.CodeAnalysis;
|
5 | 6 | using System.Globalization;
|
6 | 7 | using System.IO;
|
7 | 8 | using System.Net;
|
8 | 9 | using System.Runtime.CompilerServices;
|
| 10 | +using System.Runtime.ExceptionServices; |
9 | 11 | using System.Text;
|
10 | 12 | using System.Threading;
|
11 | 13 | using System.Threading.Tasks;
|
@@ -2456,56 +2458,82 @@ private void InternalUploadFile(Stream input, string path, Flags flags, SftpUplo
|
2456 | 2458 | // create buffer of optimal length
|
2457 | 2459 | var buffer = new byte[_sftpSession.CalculateOptimalWriteLength(_bufferSize, handle)];
|
2458 | 2460 |
|
2459 |
| - var bytesRead = input.Read(buffer, 0, buffer.Length); |
| 2461 | + int bytesRead; |
2460 | 2462 | var expectedResponses = 0;
|
2461 |
| - var responseReceivedWaitHandle = new AutoResetEvent(initialState: false); |
2462 | 2463 |
|
2463 |
| - do |
| 2464 | + // We will send out all the write requests without waiting for each response. |
| 2465 | + // Afterwards, we may wait on this handle until all responses are received |
| 2466 | + // or an error has occured. |
| 2467 | + using var mres = new ManualResetEventSlim(initialState: false); |
| 2468 | + |
| 2469 | + ExceptionDispatchInfo? exception = null; |
| 2470 | + |
| 2471 | + while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0) |
2464 | 2472 | {
|
2465 |
| - // Cancel upload |
2466 | 2473 | if (asyncResult is not null && asyncResult.IsUploadCanceled)
|
2467 | 2474 | {
|
2468 | 2475 | break;
|
2469 | 2476 | }
|
2470 | 2477 |
|
2471 |
| - if (bytesRead > 0) |
| 2478 | + exception?.Throw(); |
| 2479 | + |
| 2480 | + var writtenBytes = offset + (ulong)bytesRead; |
| 2481 | + |
| 2482 | + _ = Interlocked.Increment(ref expectedResponses); |
| 2483 | + mres.Reset(); |
| 2484 | + |
| 2485 | + _sftpSession.RequestWrite(handle, offset, buffer, offset: 0, bytesRead, wait: null, s => |
2472 | 2486 | {
|
2473 |
| - var writtenBytes = offset + (ulong)bytesRead; |
| 2487 | + var setHandle = false; |
| 2488 | + |
| 2489 | + try |
| 2490 | + { |
| 2491 | + if (Sftp.SftpSession.GetSftpException(s) is Exception ex) |
| 2492 | + { |
| 2493 | + exception = ExceptionDispatchInfo.Capture(ex); |
| 2494 | + } |
2474 | 2495 |
|
2475 |
| - _sftpSession.RequestWrite(handle, offset, buffer, offset: 0, bytesRead, wait: null, s => |
| 2496 | + if (exception is not null) |
2476 | 2497 | {
|
2477 |
| - if (s.StatusCode == StatusCodes.Ok) |
2478 |
| - { |
2479 |
| - _ = Interlocked.Decrement(ref expectedResponses); |
2480 |
| - _ = responseReceivedWaitHandle.Set(); |
| 2498 | + setHandle = true; |
| 2499 | + return; |
| 2500 | + } |
2481 | 2501 |
|
2482 |
| - asyncResult?.Update(writtenBytes); |
| 2502 | + Debug.Assert(s.StatusCode == StatusCodes.Ok); |
2483 | 2503 |
|
2484 |
| - // Call callback to report number of bytes written |
2485 |
| - if (uploadCallback is not null) |
2486 |
| - { |
2487 |
| - // Execute callback on different thread |
2488 |
| - ThreadAbstraction.ExecuteThread(() => uploadCallback(writtenBytes)); |
2489 |
| - } |
2490 |
| - } |
2491 |
| - }); |
| 2504 | + asyncResult?.Update(writtenBytes); |
| 2505 | + |
| 2506 | + // Call callback to report number of bytes written |
| 2507 | + if (uploadCallback is not null) |
| 2508 | + { |
| 2509 | + // Execute callback on different thread |
| 2510 | + ThreadAbstraction.ExecuteThread(() => uploadCallback(writtenBytes)); |
| 2511 | + } |
| 2512 | + } |
| 2513 | + finally |
| 2514 | + { |
| 2515 | + if (Interlocked.Decrement(ref expectedResponses) == 0 || setHandle) |
| 2516 | + { |
| 2517 | + mres.Set(); |
| 2518 | + } |
| 2519 | + } |
| 2520 | + }); |
2492 | 2521 |
|
2493 |
| - _ = Interlocked.Increment(ref expectedResponses); |
| 2522 | + offset += (ulong)bytesRead; |
| 2523 | + } |
2494 | 2524 |
|
2495 |
| - offset += (ulong)bytesRead; |
| 2525 | + // Make sure the read of exception cannot be executed ahead of |
| 2526 | + // the read of expectedResponses so that we do not miss an |
| 2527 | + // exception. |
2496 | 2528 |
|
2497 |
| - bytesRead = input.Read(buffer, 0, buffer.Length); |
2498 |
| - } |
2499 |
| - else if (expectedResponses > 0) |
2500 |
| - { |
2501 |
| - // Wait for expectedResponses to change |
2502 |
| - _sftpSession.WaitOnHandle(responseReceivedWaitHandle, _operationTimeout); |
2503 |
| - } |
| 2529 | + if (Volatile.Read(ref expectedResponses) != 0) |
| 2530 | + { |
| 2531 | + _sftpSession.WaitOnHandle(mres.WaitHandle, _operationTimeout); |
2504 | 2532 | }
|
2505 |
| - while (expectedResponses > 0 || bytesRead > 0); |
| 2533 | + |
| 2534 | + exception?.Throw(); |
2506 | 2535 |
|
2507 | 2536 | _sftpSession.RequestClose(handle);
|
2508 |
| - responseReceivedWaitHandle.Dispose(); |
2509 | 2537 | }
|
2510 | 2538 |
|
2511 | 2539 | private async Task InternalUploadFileAsync(Stream input, string path, CancellationToken cancellationToken)
|
|
0 commit comments