@@ -138,31 +138,54 @@ async def get_request(
138
138
input_requests = list (input_requests )
139
139
140
140
total_requests = len (input_requests )
141
- request_index = 0
141
+ assert total_requests > 0 , "No requests provided."
142
142
143
- for request in input_requests :
143
+ # Precompute delays among requests to minimize request send laggings
144
+ request_rates = []
145
+ delay_ts = []
146
+ for request_index , request in enumerate (input_requests ):
144
147
current_request_rate = _get_current_request_rate (ramp_up_strategy ,
145
148
ramp_up_start_rps ,
146
149
ramp_up_end_rps ,
147
150
request_index ,
148
151
total_requests ,
149
152
request_rate )
150
-
151
- yield request , current_request_rate
152
-
153
- request_index += 1
154
-
153
+ request_rates .append (current_request_rate )
155
154
if current_request_rate == float ("inf" ):
156
- # If the request rate is infinity, then we don't need to wait.
157
- continue
158
-
159
- theta = 1.0 / (current_request_rate * burstiness )
160
-
161
- # Sample the request interval from the gamma distribution.
162
- # If burstiness is 1, it follows exponential distribution.
163
- interval = np .random .gamma (shape = burstiness , scale = theta )
164
- # The next request will be sent after the interval.
165
- await asyncio .sleep (interval )
155
+ delay_ts .append (0 )
156
+ else :
157
+ theta = 1.0 / (current_request_rate * burstiness )
158
+
159
+ # Sample the request interval from the gamma distribution.
160
+ # If burstiness is 1, it follows exponential distribution.
161
+ delay_ts .append (np .random .gamma (shape = burstiness , scale = theta ))
162
+
163
+ # Calculate the cumulative delay time from the first sent out requests.
164
+ for i in range (1 , len (delay_ts )):
165
+ delay_ts [i ] += delay_ts [i - 1 ]
166
+ if ramp_up_strategy is None and delay_ts [- 1 ] != 0 :
167
+ # When ramp_up_strategy is not set, we assume the request rate is fixed
168
+ # and all requests should be sent in target_total_delay_s, the following
169
+ # logic would re-scale delay time to ensure the final delay_ts
170
+ # align with target_total_delay_s.
171
+ #
172
+ # NOTE: If we simply accumulate the random delta values
173
+ # from the gamma distribution, their sum would have 1-2% gap
174
+ # from target_total_delay_s. The purpose of the following logic is to
175
+ # close the gap for stablizing the throughput data
176
+ # from different random seeds.
177
+ target_total_delay_s = total_requests / request_rate
178
+ normalize_factor = target_total_delay_s / delay_ts [- 1 ]
179
+ delay_ts = [delay * normalize_factor for delay in delay_ts ]
180
+
181
+ start_ts = time .time ()
182
+ request_index = 0
183
+ for request_index , request in enumerate (input_requests ):
184
+ current_ts = time .time ()
185
+ sleep_interval_s = start_ts + delay_ts [request_index ] - current_ts
186
+ if sleep_interval_s > 0 :
187
+ await asyncio .sleep (sleep_interval_s )
188
+ yield request , request_rates [request_index ]
166
189
167
190
168
191
def calculate_metrics (
0 commit comments