Skip to content

Commit 47adbd2

Browse files
committed
Limitation of number of concurrent running requests
1 parent b847393 commit 47adbd2

File tree

5 files changed

+81
-29
lines changed

5 files changed

+81
-29
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ For more details see the <a href="https://docs.vllm.ai/en/stable/getting_started
8181
- `inter-token-latency`: the time to 'generate' each additional token (in milliseconds), optional, by default zero
8282
- `max-loras`: maximum number of LoRAs in a single batch, optional, default is one
8383
- `max-cpu-loras`: maximum number of LoRAs to store in CPU memory, optional, must be >= than max_loras, default is max_loras
84+
- `max-running-requests`: maximum number of inference requests that could be processed at the same time
8485

8586

8687
## Working with docker image

cmd/vllm-sim/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ func main() {
3131
// setup logger and context with graceful shutdown
3232
logger := klog.Background()
3333
ctx := klog.NewContext(context.Background(), logger)
34-
_ = signals.SetupSignalHandler(ctx)
34+
ctx = signals.SetupSignalHandler(ctx)
3535

3636
logger.Info("Start vllm simulator")
3737

3838
vllmSim := vllmsim.New(logger)
39-
err := vllmSim.Start()
39+
err := vllmSim.Start(ctx)
4040

4141
if err != nil {
4242
logger.Error(err, "VLLM simulator failed")

pkg/vllm-sim/defs.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/go-logr/logr"
2525
"github.com/prometheus/client_golang/prometheus"
26+
"github.com/valyala/fasthttp"
2627
)
2728

2829
const (
@@ -58,6 +59,9 @@ type VllmSimulator struct {
5859
runningLoras sync.Map
5960
// waitingLoras will represent collection of loras defined in requests in the queue - Not implemented yet
6061
waitingLoras sync.Map
62+
// maxRunningReqs defines the maximum number of inference requests that could be processed at the same time
63+
maxRunningReqs int64
64+
// nRunningReqs ithe the number of inference requests that are currently being processed
6165
nRunningReqs int64
6266
// loraInfo is prometheus gauge
6367
loraInfo *prometheus.GaugeVec
@@ -67,6 +71,8 @@ type VllmSimulator struct {
6771
waitingRequests *prometheus.GaugeVec
6872
// kvCacheUsagePercentage is prometheus gauge
6973
kvCacheUsagePercentage *prometheus.GaugeVec
74+
// channel for requeasts to be passed to workers
75+
reqChan chan *completionReqCtx
7076
}
7177

7278
// baseResponseChoice contains base completion response's choice related information
@@ -117,6 +123,13 @@ type completionRequest interface {
117123
getModel() string
118124
}
119125

126+
type completionReqCtx struct {
127+
completionReq completionRequest
128+
httpReqCtx *fasthttp.RequestCtx
129+
isChatCompletion bool
130+
wg *sync.WaitGroup
131+
}
132+
120133
// v1/chat/completion
121134
// message defines vLLM chat completion message
122135
type message struct {

pkg/vllm-sim/metrics.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package vllmsim
2121
import (
2222
"strconv"
2323
"strings"
24+
"sync/atomic"
2425
"time"
2526

2627
vllmapi "github.com/neuralmagic/vllm-sim/pkg/vllm-api"
@@ -131,6 +132,7 @@ func (s *VllmSimulator) reportLoras() {
131132

132133
// reportRequests sets information about running completion requests
133134
func (s *VllmSimulator) reportRequests() {
135+
nRunningReqs := atomic.LoadInt64(&(s.nRunningReqs))
134136
s.runningRequests.WithLabelValues(
135-
s.model).Set(float64(s.nRunningReqs))
137+
s.model).Set(float64(nRunningReqs))
136138
}

pkg/vllm-sim/simulator.go

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ limitations under the License.
1818
package vllmsim
1919

2020
import (
21+
"context"
2122
"encoding/json"
2223
"fmt"
2324
"net"
2425
"strings"
26+
"sync"
2527
"sync/atomic"
2628
"time"
2729

@@ -38,12 +40,13 @@ import (
3840
// New creates a new VllmSimulator instance with the given logger
3941
func New(logger logr.Logger) *VllmSimulator {
4042
return &VllmSimulator{
41-
logger: logger,
43+
logger: logger,
44+
reqChan: make(chan *completionReqCtx),
4245
}
4346
}
4447

4548
// Start starts the simulator
46-
func (s *VllmSimulator) Start() error {
49+
func (s *VllmSimulator) Start(ctx context.Context) error {
4750
// parse command line parameters
4851
err := s.parseCommandParams()
4952
if err != nil {
@@ -56,6 +59,10 @@ func (s *VllmSimulator) Start() error {
5659
return err
5760
}
5861

62+
// run request processing workers
63+
for i := 1; i <= int(s.maxRunningReqs); i++ {
64+
go s.reqProcessingWorker(ctx, i)
65+
}
5966
// start the http server
6067
return s.startServer()
6168
}
@@ -71,6 +78,7 @@ func (s *VllmSimulator) parseCommandParams() error {
7178
pflag.StringVar(&lorasStr, "lora", "", "List of LoRA adapters, separated by comma")
7279
pflag.IntVar(&s.maxLoras, "max-loras", 1, "Maximum number of LoRAs in a single batch")
7380
pflag.IntVar(&s.maxCpuLoras, "max-cpu-loras", 0, "Maximum number of LoRAs to store in CPU memory")
81+
pflag.Int64Var(&s.maxRunningReqs, "max-running-requests", 5, "Maximum number of inference requests that could be processed at the same time (parameter to simulate requests waiting queue)")
7482

7583
pflag.Parse()
7684

@@ -250,32 +258,60 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple
250258
return
251259
}
252260

253-
if s.isLora(model) {
254-
// if current request's model is LoRA, add it to the list of running loras
255-
value, ok := s.runningLoras.Load(model)
256-
intValue := 0
257-
258-
if !ok {
259-
s.logger.Info("Create referense counter", "model", model)
260-
intValue = 0
261-
} else {
262-
intValue = value.(int)
263-
}
264-
s.runningLoras.Store(model, intValue+1)
265-
s.logger.Info("Update LoRA referense counter", "model", model, "old value", intValue, "new value", intValue+1)
266-
267-
// TODO - check if thie request went to the waiting queue - add it to waiting map
268-
s.reportLoras()
261+
var wg sync.WaitGroup
262+
wg.Add(1)
263+
reqCtx := &completionReqCtx{
264+
completionReq: vllmReq,
265+
httpReqCtx: ctx,
266+
isChatCompletion: isChatCompletion,
267+
wg: &wg,
269268
}
270-
atomic.AddInt64(&(s.nRunningReqs), 1)
271-
s.reportRequests()
272-
273-
responseTxt := vllmReq.createResponseText(s.mode)
269+
s.reqChan <- reqCtx
270+
wg.Wait()
271+
}
274272

275-
if vllmReq.isStream() {
276-
s.sendStreamingResponse(isChatCompletion, ctx, responseTxt, vllmReq.getModel())
277-
} else {
278-
s.sendResponse(isChatCompletion, ctx, responseTxt, vllmReq.getModel())
273+
func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) {
274+
for {
275+
select {
276+
case <-ctx.Done():
277+
s.logger.Info("reqProcessingWorker stopped:", "worker id", id)
278+
return
279+
case reqCtx, ok := <-s.reqChan:
280+
if !ok {
281+
s.logger.Info("reqProcessingWorker worker exiting: reqChan closed")
282+
return
283+
}
284+
req := reqCtx.completionReq
285+
model := req.getModel()
286+
if s.isLora(model) {
287+
// if current request's model is LoRA, add it to the list of running loras
288+
value, ok := s.runningLoras.Load(model)
289+
intValue := 0
290+
291+
if !ok {
292+
s.logger.Info("Create referense counter", "model", model)
293+
intValue = 0
294+
} else {
295+
intValue = value.(int)
296+
}
297+
s.runningLoras.Store(model, intValue+1)
298+
s.logger.Info("Update LoRA referense counter", "model", model, "old value", intValue, "new value", intValue+1)
299+
300+
// TODO - check if thie request went to the waiting queue - add it to waiting map
301+
s.reportLoras()
302+
}
303+
atomic.AddInt64(&(s.nRunningReqs), 1)
304+
s.reportRequests()
305+
306+
responseTxt := req.createResponseText(s.mode)
307+
308+
if req.isStream() {
309+
s.sendStreamingResponse(reqCtx.isChatCompletion, reqCtx.httpReqCtx, responseTxt, model)
310+
} else {
311+
s.sendResponse(reqCtx.isChatCompletion, reqCtx.httpReqCtx, responseTxt, model)
312+
}
313+
reqCtx.wg.Done()
314+
}
279315
}
280316
}
281317

0 commit comments

Comments
 (0)