Skip to content

Commit b8aa65a

Browse files
paulomorgadoSergio0694
authored andcommitted
Added stream over 'ReadOnlySequence<byte>'
1 parent c23e1cc commit b8aa65a

File tree

4 files changed

+765
-1
lines changed

4 files changed

+765
-1
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using CommunityToolkit.HighPerformance.Streams;
6+
using System;
7+
using System.Buffers;
8+
using System.IO;
9+
using System.Runtime.CompilerServices;
10+
11+
namespace CommunityToolkit.HighPerformance;
12+
13+
/// <summary>
14+
/// Helpers for working with the <see cref="ReadOnlySequence{T}"/> type.
15+
/// </summary>
16+
public static class ReadOnlySequenceExtensions
17+
{
18+
/// <summary>
19+
/// Returns a <see cref="Stream"/> wrapping the contents of the given <see cref="Memory{T}"/> of <see cref="byte"/> instance.
20+
/// </summary>
21+
/// <param name="sequence">The input <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.</param>
22+
/// <returns>A <see cref="Stream"/> wrapping the data within <paramref name="sequence"/>.</returns>
23+
/// <remarks>
24+
/// Since this method only receives a <see cref="ReadOnlySequence{T}"/> instance, which does not track
25+
/// the lifetime of its underlying buffer, it is responsibility of the caller to manage that.
26+
/// In particular, the caller must ensure that the target buffer is not disposed as long
27+
/// as the returned <see cref="Stream"/> is in use, to avoid unexpected issues.
28+
/// </remarks>
29+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
30+
public static Stream AsStream(this ReadOnlySequence<byte> sequence)
31+
{
32+
return ReadOnlySequenceStream.Create(sequence);
33+
}
34+
}

src/CommunityToolkit.HighPerformance/Streams/MemoryStream.Validate.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Diagnostics.CodeAnalysis;
56
using System.IO;
67
using System.Runtime.CompilerServices;
78

@@ -24,14 +25,28 @@ public static void ValidatePosition(long position, int length)
2425
}
2526
}
2627

28+
/// <summary>
29+
/// Validates the <see cref="Stream.Position"/> argument (it needs to be in the [0, length]) range.
30+
/// </summary>
31+
/// <param name="position">The new <see cref="Stream.Position"/> value being set.</param>
32+
/// <param name="length">The maximum length of the target <see cref="Stream"/>.</param>
33+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
34+
public static void ValidatePosition(long position, long length)
35+
{
36+
if ((ulong)position > (ulong)length)
37+
{
38+
ThrowArgumentOutOfRangeExceptionForPosition();
39+
}
40+
}
41+
2742
/// <summary>
2843
/// Validates the <see cref="Stream.Read(byte[],int,int)"/> or <see cref="Stream.Write(byte[],int,int)"/> arguments.
2944
/// </summary>
3045
/// <param name="buffer">The target array.</param>
3146
/// <param name="offset">The offset within <paramref name="buffer"/>.</param>
3247
/// <param name="count">The number of elements to process within <paramref name="buffer"/>.</param>
3348
[MethodImpl(MethodImplOptions.AggressiveInlining)]
34-
public static void ValidateBuffer(byte[]? buffer, int offset, int count)
49+
public static void ValidateBuffer([NotNull] byte[]? buffer, int offset, int count)
3550
{
3651
if (buffer is null)
3752
{
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.IO;
8+
using System.Runtime.CompilerServices;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace CommunityToolkit.HighPerformance.Streams;
13+
14+
/// <summary>
15+
/// A <see cref="Stream"/> implementation wrapping a <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.
16+
/// </summary>
17+
internal sealed partial class ReadOnlySequenceStream : Stream
18+
{
19+
/// <summary>
20+
/// The <see cref="ReadOnlySequence{T}"/> instance currently in use.
21+
/// </summary>
22+
private readonly ReadOnlySequence<byte> source;
23+
24+
/// <summary>
25+
/// The current position within <see cref="source"/>.
26+
/// </summary>
27+
private int position;
28+
29+
/// <summary>
30+
/// Indicates whether or not the current instance has been disposed
31+
/// </summary>
32+
private bool disposed;
33+
34+
/// <summary>
35+
/// Initializes a new instance of the <see cref="ReadOnlySequenceStream"/> class with the specified <see cref="ReadOnlySequence{T}"/> source.
36+
/// </summary>
37+
/// <param name="source">The <see cref="ReadOnlySequence{T}"/> source.</param>
38+
public ReadOnlySequenceStream(ReadOnlySequence<byte> source)
39+
{
40+
this.source = source;
41+
}
42+
43+
/// <inheritdoc/>
44+
public sealed override bool CanRead
45+
{
46+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
47+
get => !this.disposed;
48+
}
49+
50+
/// <inheritdoc/>
51+
public sealed override bool CanSeek
52+
{
53+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
54+
get => !this.disposed;
55+
}
56+
57+
/// <inheritdoc/>
58+
public sealed override bool CanWrite
59+
{
60+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
61+
get => false;
62+
}
63+
64+
/// <inheritdoc/>
65+
public sealed override long Length
66+
{
67+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
68+
get
69+
{
70+
MemoryStream.ValidateDisposed(this.disposed);
71+
72+
return this.source.Length;
73+
}
74+
}
75+
76+
/// <inheritdoc/>
77+
public sealed override long Position
78+
{
79+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
80+
get
81+
{
82+
MemoryStream.ValidateDisposed(this.disposed);
83+
84+
return this.position;
85+
}
86+
87+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
88+
set
89+
{
90+
MemoryStream.ValidateDisposed(this.disposed);
91+
MemoryStream.ValidatePosition(value, this.source.Length);
92+
93+
this.position = unchecked((int)value);
94+
}
95+
}
96+
97+
/// <summary>
98+
/// Creates a new <see cref="Stream"/> from the input <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.
99+
/// </summary>
100+
/// <param name="sequence">The input <see cref="ReadOnlySequence{T}"/> instance.</param>
101+
/// <returns>A <see cref="Stream"/> wrapping the underlying data for <paramref name="sequence"/>.</returns>
102+
public static Stream Create(ReadOnlySequence<byte> sequence)
103+
{
104+
return new ReadOnlySequenceStream(sequence);
105+
}
106+
107+
/// <inheritdoc/>
108+
public sealed override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
109+
{
110+
if (cancellationToken.IsCancellationRequested)
111+
{
112+
return Task.FromCanceled(cancellationToken);
113+
}
114+
115+
try
116+
{
117+
MemoryStream.ValidateDisposed(this.disposed);
118+
119+
if (this.position >= this.source.Length)
120+
{
121+
return Task.CompletedTask;
122+
}
123+
124+
if (this.source.IsSingleSegment)
125+
{
126+
ReadOnlyMemory<byte> buffer = this.source.First.Slice(this.position);
127+
128+
this.position = (int)this.source.Length;
129+
130+
return destination.WriteAsync(buffer, cancellationToken).AsTask();
131+
}
132+
133+
async Task CoreCopyToAsync(Stream destination, CancellationToken cancellationToken)
134+
{
135+
ReadOnlySequence<byte> sequence = this.source.Slice(this.position);
136+
137+
this.position = (int)this.source.Length;
138+
139+
foreach (ReadOnlyMemory<byte> segment in sequence)
140+
{
141+
await destination.WriteAsync(segment, cancellationToken).ConfigureAwait(false);
142+
}
143+
}
144+
145+
return CoreCopyToAsync(destination, cancellationToken);
146+
}
147+
catch (OperationCanceledException e)
148+
{
149+
return Task.FromCanceled(e.CancellationToken);
150+
}
151+
catch (Exception e)
152+
{
153+
return Task.FromException(e);
154+
}
155+
}
156+
157+
/// <inheritdoc/>
158+
public sealed override void Flush()
159+
{
160+
}
161+
162+
/// <inheritdoc/>
163+
public sealed override Task FlushAsync(CancellationToken cancellationToken)
164+
{
165+
if (cancellationToken.IsCancellationRequested)
166+
{
167+
return Task.FromCanceled(cancellationToken);
168+
}
169+
170+
return Task.CompletedTask;
171+
}
172+
173+
/// <inheritdoc/>
174+
public sealed override Task<int> ReadAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
175+
{
176+
if (cancellationToken.IsCancellationRequested)
177+
{
178+
return Task.FromCanceled<int>(cancellationToken);
179+
}
180+
181+
try
182+
{
183+
int result = Read(buffer, offset, count);
184+
185+
return Task.FromResult(result);
186+
}
187+
catch (OperationCanceledException e)
188+
{
189+
return Task.FromCanceled<int>(e.CancellationToken);
190+
}
191+
catch (Exception e)
192+
{
193+
return Task.FromException<int>(e);
194+
}
195+
}
196+
197+
public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
198+
{
199+
throw MemoryStream.GetNotSupportedException();
200+
}
201+
202+
/// <inheritdoc/>
203+
public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
204+
{
205+
throw MemoryStream.GetNotSupportedException();
206+
}
207+
208+
/// <inheritdoc/>
209+
public sealed override long Seek(long offset, SeekOrigin origin)
210+
{
211+
MemoryStream.ValidateDisposed(this.disposed);
212+
213+
long index = origin switch
214+
{
215+
SeekOrigin.Begin => offset,
216+
SeekOrigin.Current => this.position + offset,
217+
SeekOrigin.End => this.source.Length + offset,
218+
_ => MemoryStream.ThrowArgumentExceptionForSeekOrigin()
219+
};
220+
221+
MemoryStream.ValidatePosition(index, this.source.Length);
222+
223+
this.position = unchecked((int)index);
224+
225+
return index;
226+
}
227+
228+
/// <inheritdoc/>
229+
public sealed override void SetLength(long value)
230+
{
231+
throw MemoryStream.GetNotSupportedException();
232+
}
233+
234+
/// <inheritdoc/>
235+
public sealed override int Read(byte[]? buffer, int offset, int count)
236+
{
237+
MemoryStream.ValidateDisposed(this.disposed);
238+
MemoryStream.ValidateBuffer(buffer, offset, count);
239+
240+
if (this.position >= this.source.Length)
241+
{
242+
return 0;
243+
}
244+
245+
ReadOnlySequence<byte> sequence = this.source.Slice(this.position);
246+
Span<byte> destination = buffer.AsSpan(offset, count);
247+
int bytesCopied = 0;
248+
249+
foreach (ReadOnlyMemory<byte> segment in sequence)
250+
{
251+
int bytesToCopy = Math.Min(segment.Length, destination.Length);
252+
253+
segment.Span.Slice(0, bytesToCopy).CopyTo(destination);
254+
255+
destination = destination.Slice(bytesToCopy);
256+
bytesCopied += bytesToCopy;
257+
258+
this.position += bytesToCopy;
259+
260+
if (destination.Length == 0)
261+
{
262+
break;
263+
}
264+
}
265+
266+
return bytesCopied;
267+
}
268+
269+
/// <inheritdoc/>
270+
public sealed override int ReadByte()
271+
{
272+
MemoryStream.ValidateDisposed(this.disposed);
273+
274+
if (this.position == this.source.Length)
275+
{
276+
return -1;
277+
}
278+
279+
ReadOnlySequence<byte> sequence = this.source.Slice(this.position);
280+
281+
this.position++;
282+
283+
return sequence.First.Span[0];
284+
}
285+
286+
/// <inheritdoc/>
287+
public sealed override void Write(byte[]? buffer, int offset, int count)
288+
{
289+
throw MemoryStream.GetNotSupportedException();
290+
}
291+
292+
/// <inheritdoc/>
293+
public sealed override void WriteByte(byte value)
294+
{
295+
throw MemoryStream.GetNotSupportedException();
296+
}
297+
298+
/// <inheritdoc/>
299+
protected override void Dispose(bool disposing)
300+
{
301+
if (this.disposed)
302+
{
303+
return;
304+
}
305+
306+
this.disposed = true;
307+
}
308+
}

0 commit comments

Comments
 (0)