5
5
using System . Collections . Generic ;
6
6
using System . IO ;
7
7
using System . Linq ;
8
+ using System . Threading ;
8
9
using System . Threading . Tasks ;
10
+ using Amazon . Runtime ;
9
11
10
12
namespace Cppl . Utilities . AWS
11
13
{
@@ -21,40 +23,77 @@ public class S3UploadStream : Stream
21
23
22
24
internal class Metadata
23
25
{
24
- public string BucketName ;
25
- public string Key ;
26
26
public long PartLength = DEFAULT_PART_LENGTH ;
27
27
28
28
public int PartCount = 0 ;
29
29
public string UploadId ;
30
30
public MemoryStream CurrentStream ;
31
+ public CancellationToken CancellationToken ;
31
32
32
33
public long Position = 0 ; // based on bytes written
33
34
public long Length = 0 ; // based on bytes written or SetLength, whichever is larger (no truncation)
34
35
35
36
public List < Task > Tasks = new List < Task > ( ) ;
36
- public ConcurrentDictionary < int , string > PartETags = new ConcurrentDictionary < int , string > ( ) ;
37
+ public ConcurrentDictionary < int , PartETag > PartETags = new ConcurrentDictionary < int , PartETag > ( ) ;
38
+
39
+ public InitiateMultipartUploadRequest InitiateMultipartUploadRequest ;
37
40
}
38
41
39
- Metadata _metadata = new Metadata ( ) ;
40
- IAmazonS3 _s3 = null ;
42
+ private readonly IAmazonS3 _s3 = null ;
43
+ private Metadata _metadata = new Metadata ( ) ;
44
+
45
+ public event Action < InitiateMultipartUploadResponse > Initiated ;
46
+ public event Action < UploadPartResponse > UploadedPart ;
47
+ public event Action < StreamTransferProgressArgs > StreamTransfer ;
48
+ public event Action < CompleteMultipartUploadResponse > Completed ;
49
+
41
50
42
- public S3UploadStream ( IAmazonS3 s3 , string s3uri , long partLength = DEFAULT_PART_LENGTH )
43
- : this ( s3 , new Uri ( s3uri ) , partLength )
51
+ public S3UploadStream (
52
+ IAmazonS3 s3 ,
53
+ string s3uri ,
54
+ long partLength = DEFAULT_PART_LENGTH ,
55
+ CancellationToken token = default )
56
+ : this ( s3 , new Uri ( s3uri ) , partLength , token )
44
57
{
45
58
}
46
59
47
- public S3UploadStream ( IAmazonS3 s3 , Uri s3uri , long partLength = DEFAULT_PART_LENGTH )
48
- : this ( s3 , s3uri . Host , s3uri . LocalPath . Substring ( 1 ) , partLength )
60
+ public S3UploadStream (
61
+ IAmazonS3 s3 ,
62
+ Uri s3uri ,
63
+ long partLength = DEFAULT_PART_LENGTH ,
64
+ CancellationToken token = default )
65
+ : this ( s3 , s3uri . Host , s3uri . LocalPath . Substring ( 1 ) , partLength , token )
49
66
{
50
67
}
51
68
52
- public S3UploadStream ( IAmazonS3 s3 , string bucket , string key , long partLength = DEFAULT_PART_LENGTH )
69
+ public S3UploadStream (
70
+ IAmazonS3 s3 ,
71
+ string bucket ,
72
+ string key ,
73
+ long partLength = DEFAULT_PART_LENGTH ,
74
+ CancellationToken token = default )
75
+ : this (
76
+ s3 ,
77
+ new InitiateMultipartUploadRequest
78
+ {
79
+ BucketName = bucket ,
80
+ Key = key
81
+ } ,
82
+ partLength ,
83
+ token )
84
+ {
85
+ }
86
+
87
+ public S3UploadStream (
88
+ IAmazonS3 s3 ,
89
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest ,
90
+ long partLength = DEFAULT_PART_LENGTH ,
91
+ CancellationToken token = default )
53
92
{
54
93
_s3 = s3 ;
55
- _metadata . BucketName = bucket ;
56
- _metadata . Key = key ;
57
94
_metadata . PartLength = partLength ;
95
+ _metadata . InitiateMultipartUploadRequest = initiateMultipartUploadRequest ;
96
+ _metadata . CancellationToken = token ;
58
97
}
59
98
60
99
protected override void Dispose ( bool disposing )
@@ -67,10 +106,11 @@ protected override void Dispose(bool disposing)
67
106
CompleteUpload ( ) ;
68
107
}
69
108
}
109
+
70
110
_metadata = null ;
71
111
base . Dispose ( disposing ) ;
72
112
}
73
-
113
+
74
114
public override bool CanRead => false ;
75
115
public override bool CanSeek => false ;
76
116
public override bool CanWrite => true ;
@@ -83,21 +123,26 @@ public override long Position
83
123
}
84
124
85
125
public override int Read ( byte [ ] buffer , int offset , int count ) => throw new NotImplementedException ( ) ;
126
+
86
127
public override long Seek ( long offset , SeekOrigin origin ) => throw new NotImplementedException ( ) ;
87
128
88
129
public override void SetLength ( long value )
89
130
{
90
131
_metadata . Length = Math . Max ( _metadata . Length , value ) ;
91
- _metadata . PartLength = Math . Max ( MIN_PART_LENGTH , Math . Min ( MAX_PART_LENGTH , _metadata . Length / MAX_PART_COUNT ) ) ;
132
+ _metadata . PartLength =
133
+ Math . Max ( MIN_PART_LENGTH , Math . Min ( MAX_PART_LENGTH , _metadata . Length / MAX_PART_COUNT ) ) ;
92
134
}
93
135
94
136
private void StartNewPart ( )
95
137
{
96
- if ( _metadata . CurrentStream != null ) {
138
+ if ( _metadata . CurrentStream != null )
139
+ {
97
140
Flush ( false ) ;
98
141
}
142
+
99
143
_metadata . CurrentStream = new MemoryStream ( ) ;
100
- _metadata . PartLength = Math . Min ( MAX_PART_LENGTH , Math . Max ( _metadata . PartLength , ( _metadata . PartCount / 2 + 1 ) * MIN_PART_LENGTH ) ) ;
144
+ _metadata . PartLength = Math . Min ( MAX_PART_LENGTH ,
145
+ Math . Max ( _metadata . PartLength , ( _metadata . PartCount / 2 + 1 ) * MIN_PART_LENGTH ) ) ;
101
146
}
102
147
103
148
public override void Flush ( )
@@ -109,39 +154,60 @@ private void Flush(bool disposing)
109
154
{
110
155
if ( ( _metadata . CurrentStream == null || _metadata . CurrentStream . Length < MIN_PART_LENGTH ) &&
111
156
! disposing )
157
+ {
112
158
return ;
159
+ }
113
160
114
- if ( _metadata . UploadId == null ) {
115
- _metadata . UploadId = _s3 . InitiateMultipartUploadAsync ( new InitiateMultipartUploadRequest ( )
116
- {
117
- BucketName = _metadata . BucketName ,
118
- Key = _metadata . Key
119
- } ) . GetAwaiter ( ) . GetResult ( ) . UploadId ;
161
+ if ( _metadata . UploadId == null )
162
+ {
163
+ var response = _s3
164
+ . InitiateMultipartUploadAsync ( _metadata . InitiateMultipartUploadRequest , _metadata . CancellationToken )
165
+ . GetAwaiter ( ) . GetResult ( ) ;
166
+ _metadata . CancellationToken . ThrowIfCancellationRequested ( ) ;
167
+ Initiated ? . Invoke ( response ) ;
168
+ _metadata . UploadId = response . UploadId ;
120
169
}
121
-
170
+
122
171
if ( _metadata . CurrentStream != null )
123
172
{
124
173
var i = ++ _metadata . PartCount ;
125
174
126
175
_metadata . CurrentStream . Seek ( 0 , SeekOrigin . Begin ) ;
127
176
var request = new UploadPartRequest ( )
128
177
{
129
- BucketName = _metadata . BucketName ,
130
- Key = _metadata . Key ,
178
+ BucketName = _metadata . InitiateMultipartUploadRequest . BucketName ,
179
+ Key = _metadata . InitiateMultipartUploadRequest . Key ,
131
180
UploadId = _metadata . UploadId ,
132
181
PartNumber = i ,
133
182
IsLastPart = disposing ,
134
- InputStream = _metadata . CurrentStream
183
+ InputStream = _metadata . CurrentStream ,
184
+ ChecksumAlgorithm = _metadata . InitiateMultipartUploadRequest . ChecksumAlgorithm ,
135
185
} ;
136
186
_metadata . CurrentStream = null ;
187
+ request . StreamTransferProgress += ( _ , progressArgs ) => { StreamTransfer ? . Invoke ( progressArgs ) ; } ;
137
188
138
- var upload = Task . Run ( async ( ) =>
139
- {
140
- var response = await _s3 . UploadPartAsync ( request ) ;
141
- _metadata . PartETags . AddOrUpdate ( i , response . ETag ,
142
- ( n , s ) => response . ETag ) ;
143
- request . InputStream . Dispose ( ) ;
144
- } ) ;
189
+ var upload = Task . Run (
190
+ async ( ) =>
191
+ {
192
+ var response = await _s3 . UploadPartAsync ( request , _metadata . CancellationToken ) ;
193
+
194
+ _metadata . CancellationToken . ThrowIfCancellationRequested ( ) ;
195
+
196
+ UploadedPart ? . Invoke ( response ) ;
197
+
198
+ var partETag = new PartETag
199
+ {
200
+ PartNumber = response . PartNumber ,
201
+ ETag = response . ETag ,
202
+ ChecksumSHA1 = response . ChecksumSHA1 ,
203
+ ChecksumSHA256 = response . ChecksumSHA256 ,
204
+ ChecksumCRC32 = response . ChecksumCRC32 ,
205
+ ChecksumCRC32C = response . ChecksumCRC32C ,
206
+ } ;
207
+
208
+ _metadata . PartETags . AddOrUpdate ( i , partETag , ( n , s ) => partETag ) ;
209
+ request . InputStream . Dispose ( ) ;
210
+ } ) ;
145
211
_metadata . Tasks . Add ( upload ) ;
146
212
}
147
213
}
@@ -150,14 +216,19 @@ private void CompleteUpload()
150
216
{
151
217
Task . WaitAll ( _metadata . Tasks . ToArray ( ) ) ;
152
218
153
- if ( Length > 0 ) {
154
- _s3 . CompleteMultipartUploadAsync ( new CompleteMultipartUploadRequest ( )
155
- {
156
- BucketName = _metadata . BucketName ,
157
- Key = _metadata . Key ,
158
- PartETags = _metadata . PartETags . Select ( e => new PartETag ( e . Key , e . Value ) ) . ToList ( ) ,
159
- UploadId = _metadata . UploadId
160
- } ) . GetAwaiter ( ) . GetResult ( ) ;
219
+ if ( Length > 0 )
220
+ {
221
+ _metadata . CancellationToken . ThrowIfCancellationRequested ( ) ;
222
+
223
+ var response = _s3 . CompleteMultipartUploadAsync (
224
+ new CompleteMultipartUploadRequest ( )
225
+ {
226
+ BucketName = _metadata . InitiateMultipartUploadRequest . BucketName ,
227
+ Key = _metadata . InitiateMultipartUploadRequest . Key ,
228
+ PartETags = _metadata . PartETags . Values . ToList ( ) ,
229
+ UploadId = _metadata . UploadId ,
230
+ } , _metadata . CancellationToken ) . GetAwaiter ( ) . GetResult ( ) ;
231
+ Completed ? . Invoke ( response ) ;
161
232
}
162
233
}
163
234
@@ -171,8 +242,20 @@ public override void Write(byte[] buffer, int offset, int count)
171
242
var c = Math . Min ( count , buffer . Length - offset ) ; // don't over-read the buffer, even if asked to
172
243
do
173
244
{
245
+ if ( _metadata . CancellationToken . IsCancellationRequested )
246
+ {
247
+ return ;
248
+ }
249
+
174
250
if ( _metadata . CurrentStream == null || _metadata . CurrentStream . Length >= _metadata . PartLength )
251
+ {
175
252
StartNewPart ( ) ;
253
+ }
254
+
255
+ if ( _metadata . CurrentStream == null )
256
+ {
257
+ throw new ArgumentNullException ( nameof ( Metadata . CurrentStream ) ) ;
258
+ }
176
259
177
260
var remaining = _metadata . PartLength - _metadata . CurrentStream . Length ;
178
261
var w = Math . Min ( c , ( int ) remaining ) ;
@@ -184,4 +267,4 @@ public override void Write(byte[] buffer, int offset, int count)
184
267
} while ( c > 0 ) ;
185
268
}
186
269
}
187
- }
270
+ }
0 commit comments