Skip to content

Commit b95c995

Browse files
committed
😊 基于基于令牌桶算法改进 RateLimitedStream 带应用速率限制的流
1 parent b88a3b4 commit b95c995

File tree

3 files changed

+206
-97
lines changed

3 files changed

+206
-97
lines changed

src/HttpAgent/src/HttpAgent.xml

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6925,33 +6925,52 @@
69256925
<summary>
69266926
带应用速率限制的流
69276927
</summary>
6928+
<remarks>
6929+
<para>基于令牌桶算法(Token Bucket Algorithm)实现流量控制和速率限制。</para>
6930+
<para>参考文献:https://baike.baidu.com/item/令牌桶算法/6597000。</para>
6931+
</remarks>
6932+
</member>
6933+
<member name="F:HttpAgent.RateLimitedStream.CHUNK_SIZE">
6934+
<summary>
6935+
单次读取或写入操作中处理的最大数据块大小
6936+
</summary>
69286937
</member>
69296938
<member name="F:HttpAgent.RateLimitedStream._bytesPerSecond">
69306939
<summary>
6931-
每秒允许的最大字节数
6940+
每秒允许传输的最大字节数
69326941
</summary>
69336942
</member>
69346943
<member name="F:HttpAgent.RateLimitedStream._innerStream">
69356944
<inheritdoc cref="T:System.IO.Stream" />
69366945
</member>
6946+
<member name="F:HttpAgent.RateLimitedStream._lockObject">
6947+
<summary>
6948+
用于同步访问的锁对象
6949+
</summary>
6950+
</member>
69376951
<member name="F:HttpAgent.RateLimitedStream._stopwatch">
69386952
<summary>
6939-
用于精确计时的 <see cref="T:System.Diagnostics.Stopwatch" /> 实例
6953+
用来计算时间间隔的计时器
6954+
</summary>
6955+
</member>
6956+
<member name="F:HttpAgent.RateLimitedStream._availableTokens">
6957+
<summary>
6958+
当前可用的令牌数量(字节数)
69406959
</summary>
69416960
</member>
6942-
<member name="F:HttpAgent.RateLimitedStream._totalBytesProcessed">
6961+
<member name="F:HttpAgent.RateLimitedStream._lastTokenRefillTime">
69436962
<summary>
6944-
到目前为止已读取或写入的总字节数
6963+
上次令牌补充的时间戳
69456964
</summary>
69466965
</member>
6947-
<member name="M:HttpAgent.RateLimitedStream.#ctor(System.IO.Stream,System.Int32)">
6966+
<member name="M:HttpAgent.RateLimitedStream.#ctor(System.IO.Stream,System.Double)">
69486967
<summary>
69496968
<inheritdoc cref="T:HttpAgent.RateLimitedStream" />
69506969
</summary>
69516970
<param name="innerStream">
69526971
<see cref="T:System.IO.Stream" />
69536972
</param>
6954-
<param name="bytesPerSecond">每秒允许的最大字节数</param>
6973+
<param name="bytesPerSecond">每秒允许传输的最大字节数</param>
69556974
</member>
69566975
<member name="P:HttpAgent.RateLimitedStream.CanRead">
69576976
<inheritdoc />
@@ -6989,11 +7008,16 @@
69897008
<member name="M:HttpAgent.RateLimitedStream.Dispose(System.Boolean)">
69907009
<inheritdoc />
69917010
</member>
6992-
<member name="M:HttpAgent.RateLimitedStream.ApplyRateLimitAsync(System.Int32)">
7011+
<member name="M:HttpAgent.RateLimitedStream.RefillTokens">
7012+
<summary>
7013+
补充令牌的方法
7014+
</summary>
7015+
</member>
7016+
<member name="M:HttpAgent.RateLimitedStream.WaitForTokens(System.Int32)">
69937017
<summary>
6994-
根据设定的速率限制调整读写操作的速度
7018+
等待直到有足够令牌可用
69957019
</summary>
6996-
<param name="bytesToProcess">本次操作将处理的字节数</param>
7020+
<param name="desiredTokens">需要等待的令牌数量</param>
69977021
</member>
69987022
<member name="T:HttpAgent.ServerSentEventsData">
69997023
<summary>

src/HttpAgent/src/Models/RateLimitedStream.cs

Lines changed: 108 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,53 @@ namespace HttpAgent;
77
/// <summary>
88
/// 带应用速率限制的流
99
/// </summary>
10+
/// <remarks>
11+
/// <para>基于令牌桶算法(Token Bucket Algorithm)实现流量控制和速率限制。</para>
12+
/// <para>参考文献:https://baike.baidu.com/item/令牌桶算法/6597000。</para>
13+
/// </remarks>
1014
public sealed class RateLimitedStream : Stream
1115
{
1216
/// <summary>
13-
/// 每秒允许的最大字节数
17+
/// 单次读取或写入操作中处理的最大数据块大小
1418
/// </summary>
15-
private readonly int _bytesPerSecond;
19+
internal const int CHUNK_SIZE = 4096;
20+
21+
/// <summary>
22+
/// 每秒允许传输的最大字节数
23+
/// </summary>
24+
internal readonly double _bytesPerSecond;
1625

1726
/// <inheritdoc cref="Stream" />
1827
internal readonly Stream _innerStream;
1928

2029
/// <summary>
21-
/// 用于精确计时的 <see cref="Stopwatch" /> 实例
30+
/// 用于同步访问的锁对象
2231
/// </summary>
23-
internal readonly Stopwatch _stopwatch = new();
32+
internal readonly object _lockObject = new();
2433

2534
/// <summary>
26-
/// 到目前为止已读取或写入的总字节数
35+
/// 用来计算时间间隔的计时器
2736
/// </summary>
28-
internal long _totalBytesProcessed;
37+
internal readonly Stopwatch _stopwatch;
38+
39+
/// <summary>
40+
/// 当前可用的令牌数量(字节数)
41+
/// </summary>
42+
internal double _availableTokens;
43+
44+
/// <summary>
45+
/// 上次令牌补充的时间戳
46+
/// </summary>
47+
internal long _lastTokenRefillTime;
2948

3049
/// <summary>
3150
/// <inheritdoc cref="RateLimitedStream" />
3251
/// </summary>
3352
/// <param name="innerStream">
3453
/// <see cref="Stream" />
3554
/// </param>
36-
/// <param name="bytesPerSecond">每秒允许的最大字节数</param>
37-
public RateLimitedStream(Stream innerStream, int bytesPerSecond)
55+
/// <param name="bytesPerSecond">每秒允许传输的最大字节数</param>
56+
public RateLimitedStream(Stream innerStream, double bytesPerSecond)
3857
{
3958
// 空检查
4059
ArgumentNullException.ThrowIfNull(innerStream);
@@ -49,8 +68,14 @@ public RateLimitedStream(Stream innerStream, int bytesPerSecond)
4968
_innerStream = innerStream;
5069
_bytesPerSecond = bytesPerSecond;
5170

52-
// 启动 Stopwatch 来开始计时
53-
_stopwatch.Start();
71+
// 开始计时
72+
_stopwatch = Stopwatch.StartNew();
73+
74+
// 记录初始时间
75+
_lastTokenRefillTime = _stopwatch.ElapsedMilliseconds;
76+
77+
// 初始化可用令牌数
78+
_availableTokens = bytesPerSecond;
5479
}
5580

5681
/// <inheritdoc />
@@ -81,11 +106,14 @@ public override long Position
81106
/// <inheritdoc />
82107
public override int Read(byte[] buffer, int offset, int count)
83108
{
84-
// 根据设定的速率限制调整读写操作的速度
85-
ApplyRateLimitAsync(count).GetAwaiter().GetResult();
109+
// 确保单次读取不会超过预设的数据块大小
110+
var adjustedCount = Math.Min(count, CHUNK_SIZE);
111+
112+
// 等待直到有足够令牌可用
113+
WaitForTokens(adjustedCount);
86114

87115
// 从内部流读取数据到缓冲区
88-
return _innerStream.Read(buffer, offset, count);
116+
return _innerStream.Read(buffer, offset, adjustedCount);
89117
}
90118

91119
/// <inheritdoc />
@@ -97,11 +125,14 @@ public override int Read(byte[] buffer, int offset, int count)
97125
/// <inheritdoc />
98126
public override void Write(byte[] buffer, int offset, int count)
99127
{
100-
// 向内部流写入数据
101-
_innerStream.Write(buffer, offset, count);
128+
// 确保单次写入不会超过预设的数据块大小
129+
var adjustedCount = Math.Min(count, CHUNK_SIZE);
102130

103-
// 根据设定的速率限制调整读写操作的速度
104-
ApplyRateLimitAsync(count).GetAwaiter().GetResult();
131+
// 等待直到有足够令牌可用
132+
WaitForTokens(adjustedCount);
133+
134+
// 向内部流写入数据
135+
_innerStream.Write(buffer, offset, adjustedCount);
105136
}
106137

107138
/// <inheritdoc />
@@ -111,35 +142,78 @@ protected override void Dispose(bool disposing)
111142
if (disposing)
112143
{
113144
_innerStream.Dispose();
145+
_stopwatch.Stop();
114146
}
115147

116148
base.Dispose(disposing);
117149
}
118150

119151
/// <summary>
120-
/// 根据设定的速率限制调整读写操作的速度
152+
/// 补充令牌的方法
121153
/// </summary>
122-
/// <param name="bytesToProcess">本次操作将处理的字节数</param>
123-
internal async Task ApplyRateLimitAsync(int bytesToProcess)
154+
internal void RefillTokens()
124155
{
125-
// 自开始以来经过的时间(秒)
126-
var elapsedSeconds = _stopwatch.ElapsedMilliseconds / 1000.0;
156+
// 获取当前计时器的时间
157+
var now = _stopwatch.ElapsedMilliseconds;
127158

128-
// 根据速率预期应读取的字节数
129-
var totalBytesExpected = elapsedSeconds * _bytesPerSecond;
159+
// 计算自上次填充令牌以来经过的时间
160+
var timePassed = now - _lastTokenRefillTime;
130161

131-
// 计算实际与预期之差
132-
var bytesOverLimit = _totalBytesProcessed + bytesToProcess - totalBytesExpected;
133-
134-
if (bytesOverLimit > 0)
162+
// 如果时间没有流逝或者流逝时间不足以产生新的令牌,则直接返回
163+
if (timePassed <= 0)
135164
{
136-
// 如果实际操作超过预期,则计算需要等待的时间,并进行延迟
137-
var delayMilliseconds = (int)(bytesOverLimit / _bytesPerSecond * 1000.0);
138-
139-
await Task.Delay(delayMilliseconds).ConfigureAwait(false);
165+
return;
140166
}
141167

142-
// 更新已处理的总字节数
143-
_totalBytesProcessed += bytesToProcess;
168+
// 据每秒允许的最大字节数以及经过的时间计算可以补充的令牌数量
169+
var newTokens = _bytesPerSecond * timePassed / 1000.0;
170+
171+
// 更新可用令牌,但不超过每秒允许的最大值
172+
_availableTokens = Math.Min(_bytesPerSecond, _availableTokens + newTokens);
173+
174+
// 更新最后一次填充令牌的时间戳
175+
_lastTokenRefillTime = now;
176+
}
177+
178+
/// <summary>
179+
/// 等待直到有足够令牌可用
180+
/// </summary>
181+
/// <param name="desiredTokens">需要等待的令牌数量</param>
182+
internal void WaitForTokens(int desiredTokens)
183+
{
184+
while (true)
185+
{
186+
// 防止并发访问问题
187+
lock (_lockObject)
188+
{
189+
// 尝试补充令牌
190+
RefillTokens();
191+
192+
// 检查是否已有足够的令牌
193+
if (_availableTokens >= desiredTokens)
194+
{
195+
// 扣除所需的令牌数量
196+
_availableTokens -= desiredTokens;
197+
198+
// 如果有足够的令牌,退出循环
199+
return;
200+
}
201+
}
202+
203+
// 如果没有足够的令牌,计算还需要多少令牌
204+
var requiredTokens = desiredTokens - _availableTokens;
205+
206+
// 计算为了获得所需令牌需要等待的时间
207+
var waitTime = (int)(requiredTokens * 1000.0 / _bytesPerSecond);
208+
209+
// 添加一点额外延迟用来确保精确性,具体是增加了 5% 的延迟
210+
waitTime = (int)(waitTime * 1.05);
211+
212+
// 确保不会一次性等待过长时间,最多等待 100 毫秒
213+
if (waitTime > 0)
214+
{
215+
Thread.Sleep(Math.Min(100, waitTime));
216+
}
217+
}
144218
}
145219
}

0 commit comments

Comments
 (0)