Skip to content

Commit 0cf5826

Browse files
committed
Waiting requests counting and reporting are added.
1 parent 314537f commit 0cf5826

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

pkg/vllm-sim/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type VllmSimulator struct {
6363
maxRunningReqs int64
6464
// nRunningReqs ithe the number of inference requests that are currently being processed
6565
nRunningReqs int64
66+
// nWaitingReqs ithe the number of inference requests that are waiting to be processed
67+
nWaitingReqs int64
6668
// loraInfo is prometheus gauge
6769
loraInfo *prometheus.GaugeVec
6870
// runningRequests is prometheus gauge

pkg/vllm-sim/metrics.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,16 @@ func (s *VllmSimulator) reportLoras() {
130130
"").Set(float64(time.Now().Unix()))
131131
}
132132

133-
// reportRequests sets information about running completion requests
134-
func (s *VllmSimulator) reportRequests() {
133+
// reportRunningRequests sets information about running completion requests
134+
func (s *VllmSimulator) reportRunningRequests() {
135135
nRunningReqs := atomic.LoadInt64(&(s.nRunningReqs))
136136
s.runningRequests.WithLabelValues(
137137
s.model).Set(float64(nRunningReqs))
138138
}
139+
140+
// reportWaitingRequests sets information about waiting completion requests
141+
func (s *VllmSimulator) reportWaitingRequests() {
142+
nWaitingReqs := atomic.LoadInt64(&(s.nWaitingReqs))
143+
s.waitingRequests.WithLabelValues(
144+
s.model).Set(float64(nWaitingReqs))
145+
}

pkg/vllm-sim/simulator.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
func New(logger logr.Logger) *VllmSimulator {
4242
return &VllmSimulator{
4343
logger: logger,
44-
reqChan: make(chan *completionReqCtx),
44+
reqChan: make(chan *completionReqCtx, 1000),
4545
}
4646
}
4747

@@ -267,6 +267,8 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple
267267
wg: &wg,
268268
}
269269
s.reqChan <- reqCtx
270+
atomic.StoreInt64(&(s.nWaitingReqs), int64(len(s.reqChan)))
271+
s.reportWaitingRequests()
270272
wg.Wait()
271273
}
272274

@@ -281,6 +283,9 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) {
281283
s.logger.Info("reqProcessingWorker worker exiting: reqChan closed")
282284
return
283285
}
286+
atomic.StoreInt64(&(s.nWaitingReqs), int64(len(s.reqChan)))
287+
s.reportWaitingRequests()
288+
284289
req := reqCtx.completionReq
285290
model := req.getModel()
286291
if s.isLora(model) {
@@ -301,7 +306,7 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) {
301306
s.reportLoras()
302307
}
303308
atomic.AddInt64(&(s.nRunningReqs), 1)
304-
s.reportRequests()
309+
s.reportRunningRequests()
305310

306311
responseTxt := req.createResponseText(s.mode)
307312

@@ -319,7 +324,7 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) {
319324
func (s *VllmSimulator) responseSentCallback(model string) {
320325

321326
atomic.AddInt64(&(s.nRunningReqs), -1)
322-
s.reportRequests()
327+
s.reportRunningRequests()
323328

324329
if model == s.model {
325330
// this is base model - do not continue

0 commit comments

Comments
 (0)