@@ -51,8 +51,11 @@ def __call__(self, request_records: List[RequestRecord]) -> List[RequestRecord]:
51
51
class SampleRequests (RequestProcessor ): # pylint: disable=too-few-public-methods
52
52
"""The processor that samples requests out from the given request list."""
53
53
54
- def __init__ (self , num_requests : int ) -> None :
54
+ def __init__ (self , num_requests : int , take_first_x_requests : bool = False ) -> None :
55
55
self .num_requests = num_requests
56
+ # If `take_first_x_requests` is True, the first `num_requests` requests
57
+ # are returned and sampling will not happen.
58
+ self .take_first_x_requests = take_first_x_requests
56
59
57
60
def __call__ (self , request_records : List [RequestRecord ]) -> List [RequestRecord ]:
58
61
assert len (request_records ) > 0 , "Empty input request record."
@@ -69,12 +72,20 @@ def _sample_from_plain_request_records(
69
72
self , request_records : List [RequestRecord ]
70
73
) -> List [RequestRecord ]:
71
74
samples : List [RequestRecord ] = []
72
- while len (samples ) < self .num_requests :
73
- # Create a new list so that the in-place shuffle does not mutate the input list.
74
- records = list (request_records )
75
- random .shuffle (records )
76
- samples += copy .deepcopy (records )
77
- samples = samples [: self .num_requests ]
75
+ if self .take_first_x_requests :
76
+ if len (request_records ) < self .num_requests :
77
+ raise ValueError (
78
+ f"Insufficient requests. Requiring { self .num_requests } requests "
79
+ f"but only { len (request_records )} are available."
80
+ )
81
+ samples = copy .deepcopy (list (request_records [: self .num_requests ]))
82
+ else :
83
+ while len (samples ) < self .num_requests :
84
+ # Create a new list so that the in-place shuffle does not mutate the input list.
85
+ records = list (request_records )
86
+ random .shuffle (records )
87
+ samples += copy .deepcopy (records )
88
+ samples = samples [: self .num_requests ]
78
89
for i , record in enumerate (samples ):
79
90
record .request_id = i
80
91
return samples
@@ -95,7 +106,8 @@ def _sample_from_grouped_request_records(
95
106
96
107
# Create a new list so that the in-place shuffle does not mutate the input list.
97
108
records = list (grouped_request_records )
98
- random .shuffle (records )
109
+ if not self .take_first_x_requests :
110
+ random .shuffle (records )
99
111
remaining = self .num_requests
100
112
samples : List [RequestRecord ] = []
101
113
for grouped_request_record in grouped_request_records :
@@ -183,6 +195,22 @@ def __call__(self, request_records: List[RequestRecord]) -> List[RequestRecord]:
183
195
return request_records
184
196
185
197
198
+ class ScaleTimestamp (RequestProcessor ): # pylint: disable=too-few-public-methods
199
+ """Scale the timestamp of requests by the given scale factor."""
200
+
201
+ def __init__ (self , timestamp_scale : float ):
202
+ self .timestamp_scale = timestamp_scale
203
+
204
+ def __call__ (self , request_records : List [RequestRecord ]) -> List [RequestRecord ]:
205
+ for request_record in request_records :
206
+ if request_record .timestamp is None :
207
+ raise ValueError (
208
+ f"The timestamp of request { request_record } has not been initialized."
209
+ )
210
+ request_record .timestamp *= self .timestamp_scale
211
+ return request_records
212
+
213
+
186
214
class MetricAnalyzer (RequestProcessor ): # pylint: disable=too-few-public-methods
187
215
"""The processor that analyzes the raw benchmark results and computes more detailed metrics."""
188
216
@@ -463,7 +491,6 @@ def __init__( # pylint: disable=too-many-arguments
463
491
disable_tqdm : bool ,
464
492
max_schedule_gap : float ,
465
493
num_requests : int ,
466
- request_rate : Optional [np .float32 ] = None ,
467
494
) -> None :
468
495
if num_processes is None :
469
496
# We assign each process at most 32 requests to send
@@ -472,7 +499,6 @@ def __init__( # pylint: disable=too-many-arguments
472
499
super ().__init__ (f_create_api_endpoint , num_processes , disable_tqdm )
473
500
self .max_schedule_gap = max_schedule_gap
474
501
self .num_requests = num_requests
475
- self .request_rate = request_rate
476
502
477
503
def __call__ (self , request_records : List [RequestRecord ]) -> List [RequestRecord ]:
478
504
assert len (request_records ) > 0
@@ -574,7 +600,7 @@ async def _task(request_record: RequestRecord) -> None:
574
600
)
575
601
576
602
577
- def create_pipelines (
603
+ def create_pipelines ( # pylint: disable=too-many-branches
578
604
args : argparse .Namespace , f_create_api_endpoint : Callable [[], APIEndPoint ], dataset : Dataset
579
605
) -> List [RequestProcessor ]:
580
606
"""Creating request processing pipelines with regard to the specified args."""
@@ -586,6 +612,10 @@ def create_pipelines(
586
612
'Both "num_concurrent_requests" and "request_rate" are specified. '
587
613
"Please specify only one of them."
588
614
)
615
+ if args .replay_timestamp_scale is not None :
616
+ raise ValueError (
617
+ "Dataset replay is unsupported when fixing number of concurrent requests."
618
+ )
589
619
for num_concurrent_requests in args .num_concurrent_requests :
590
620
num_warmup_requests = (
591
621
args .num_warmup_requests
@@ -622,6 +652,8 @@ def create_pipelines(
622
652
"Please specify the number of warmup requests via "
623
653
'"--num-warmup-requests" when fixing request rate.'
624
654
)
655
+ if args .replay_timestamp_scale is not None :
656
+ raise ValueError ("Dataset replay is unsupported when fixing request rates." )
625
657
num_total_requests = int (
626
658
args .num_requests if not args .per_gpu_workload else args .num_requests * args .num_gpus
627
659
)
@@ -649,15 +681,55 @@ def create_pipelines(
649
681
args .disable_tqdm ,
650
682
args .max_schedule_gap ,
651
683
args .num_requests ,
652
- request_rate ,
653
684
),
654
685
cuda_profile_url = cuda_profile_url ,
655
686
fake_warmup = dataset .require_fake_warmup ,
656
687
),
657
688
)
658
689
for request_rate in args .request_rate
659
690
]
660
- raise ValueError (
661
- 'Unable to create executor. Please specify one of "num_concurrent_requests" '
662
- 'and "request_rate".'
663
- )
691
+
692
+ # Default: dataset replay mode
693
+ # The dataset must come with timestamps.
694
+ if not dataset .timestamp_available :
695
+ raise ValueError (
696
+ "The dataset does not have timestamps, so dataset replay is unsupported. "
697
+ 'Please specify one of "num_concurrent_requests" '
698
+ 'and "request_rate".'
699
+ )
700
+ if args .per_gpu_workload :
701
+ raise ValueError ("Fixing per-GPU workload is not compatible with dataset replay." )
702
+ if args .num_warmup_requests is None :
703
+ raise ValueError (
704
+ "Please specify the number of warmup requests via "
705
+ '"--num-warmup-requests" for dataset replay.'
706
+ )
707
+ timestamp_scale = args .replay_timestamp_scale or 1.0
708
+ if dataset .require_fake_warmup :
709
+ num_samples = args .num_requests
710
+ else :
711
+ num_samples = args .num_requests + args .num_warmup_requests
712
+ return [
713
+ SequentialProcessor (
714
+ LogMessage (f"Dataset replay with time scaling of { timestamp_scale } " ),
715
+ SampleRequests (num_samples , take_first_x_requests = True ),
716
+ AttachModelName (args .tokenizer ),
717
+ ScaleTimestamp (timestamp_scale ),
718
+ AttachStreamFlag (args .stream ),
719
+ AttachSamplingOptions (args .temperature , args .top_p , args .ignore_eos ),
720
+ AttachExecutionFeature ({"timestamp_scale" : timestamp_scale }),
721
+ WarmupAndRun (
722
+ num_warmup_requests = args .num_warmup_requests ,
723
+ num_benchmark_requests = args .num_requests ,
724
+ pipeline = FixTimestampExecutor (
725
+ f_create_api_endpoint ,
726
+ args .num_process_workers ,
727
+ args .disable_tqdm ,
728
+ args .max_schedule_gap ,
729
+ args .num_requests ,
730
+ ),
731
+ cuda_profile_url = cuda_profile_url ,
732
+ fake_warmup = dataset .require_fake_warmup ,
733
+ ),
734
+ )
735
+ ]
0 commit comments