From 10d5088e1bcfc566525cb3dc1eb23e077d203a88 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Mon, 14 Jul 2025 15:43:35 +0300 Subject: [PATCH 1/7] Add P/D support, respond accordingly to doRemotePrefill and doRemoteDecode fields Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/config.go | 6 +++ pkg/llm-d-inference-sim/request.go | 20 ++++++++- pkg/llm-d-inference-sim/response.go | 8 +++- pkg/llm-d-inference-sim/simulator.go | 63 +++++++++++++++++++++++----- pkg/llm-d-inference-sim/streaming.go | 10 ++--- 5 files changed, 89 insertions(+), 18 deletions(-) diff --git a/pkg/llm-d-inference-sim/config.go b/pkg/llm-d-inference-sim/config.go index e5e6999..68ffaff 100644 --- a/pkg/llm-d-inference-sim/config.go +++ b/pkg/llm-d-inference-sim/config.go @@ -53,6 +53,9 @@ type configuration struct { TimeToFirstToken int `yaml:"time-to-first-token"` // InterTokenLatency time between generated tokens, in milliseconds InterTokenLatency int `yaml:"inter-token-latency"` + // KVCacheTransferLatency time to "transfer" kv-cache from another vLLM instance in case P/D is activated, in milliseconds + KVCacheTransferLatency int `yaml:"kv_cache_transfer_latency"` + // Mode defines the simulator response generation mode, valid values: echo, random Mode string `yaml:"mode"` // Seed defines random seed for operations @@ -145,6 +148,9 @@ func (c *configuration) validate() error { if c.TimeToFirstToken < 0 { return errors.New("time to first token cannot be negative") } + if c.KVCacheTransferLatency < 0 { + return errors.New("kv-cache tranfer time cannot be negative") + } if c.MaxLoras < 1 { return errors.New("max LoRAs cannot be less than 1") } diff --git a/pkg/llm-d-inference-sim/request.go b/pkg/llm-d-inference-sim/request.go index 4ebfecb..8506c13 100644 --- a/pkg/llm-d-inference-sim/request.go +++ b/pkg/llm-d-inference-sim/request.go @@ -44,6 +44,10 @@ type completionRequest interface { getToolChoice() string // getMaxCompletionTokens returns the maximum completion tokens requested getMaxCompletionTokens() *int64 + // isDoRemoteDecode() returns true is do_remote_decode is true in the request, this means that this is prefill request + doRemoteDecode() bool + // isDoRemotePrefill() returns true is do_remote_prefill is true in the request, this means that this is decode request + doRemotePrefill() bool } // baseCompletionRequest contains base completion request related information @@ -53,7 +57,13 @@ type baseCompletionRequest struct { // StreamOptions defines streaming options in case Stream is set to true StreamOptions streamOptions `json:"stream_options"` // Model defines Model name to use for "inference", could be base Model name or one of available LoRA adapters - Model string `json:"model"` + Model string `json:"model"` + DoRemoteDecode bool `json:"do_remote_decode"` + DoRemotePrefill bool `json:"do_remote_prefill"` + RemoteBlockIds []string `json:"remote_block_ids"` + RemoteEngineId string `json:"remote_engine_id"` + RemoteHost string `json:"remote_host"` + RemotePort int `json:"remote_port"` } // StreamOptions defines streaming options for streaming requests @@ -74,6 +84,14 @@ func (b *baseCompletionRequest) includeUsage() bool { return !b.Stream || b.StreamOptions.IncludeUsage } +func (b *baseCompletionRequest) doRemoteDecode() bool { + return b.DoRemoteDecode +} + +func (b *baseCompletionRequest) doRemotePrefill() bool { + return b.DoRemotePrefill +} + // completionReqCtx is a context passed in the simulator's flow, it contains the request data needed // to generate the simulator's response type completionReqCtx struct { diff --git a/pkg/llm-d-inference-sim/response.go b/pkg/llm-d-inference-sim/response.go index 08dcacf..b7fd766 100644 --- a/pkg/llm-d-inference-sim/response.go +++ b/pkg/llm-d-inference-sim/response.go @@ -37,7 +37,13 @@ type baseCompletionResponse struct { // Usage contains the token usage statistics for the request Usage *usage `json:"usage"` // Object is the Object type, "text_completion", "chat.completion", or "chat.completion.chunk" - Object string `json:"object"` + Object string `json:"object"` + DoRemoteDecode bool `json:"do_remote_decode"` + DoRemotePrefill bool `json:"do_remote_prefill"` + RemoteBlockIds []string `json:"remote_block_ids"` + RemoteEngineId string `json:"remote_engine_id"` + RemoteHost string `json:"remote_host"` + RemotePort int `json:"remote_port"` } // usage contains token usage statistics diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 1ccb7ad..629d50f 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -49,6 +49,7 @@ const ( stopFinishReason = "stop" lengthFinishReason = "length" toolsFinishReason = "tool_calls" + remoteDecodeFinishReason = "remote_decode" roleAssistant = "assistant" roleUser = "user" textCompletionObject = "text_completion" @@ -155,6 +156,7 @@ func (s *VllmSimulator) parseCommandParamsAndLoadConfig() error { f.StringVar(&config.Mode, "mode", config.Mode, "Simulator mode, echo - returns the same text that was sent in the request, for chat completion returns the last message, random - returns random sentence from a bank of pre-defined sentences") f.IntVar(&config.InterTokenLatency, "inter-token-latency", config.InterTokenLatency, "Time to generate one token (in milliseconds)") f.IntVar(&config.TimeToFirstToken, "time-to-first-token", config.TimeToFirstToken, "Time to first token (in milliseconds)") + f.IntVar(&config.KVCacheTransferLatency, "kv_cache_transfer_latency", config.KVCacheTransferLatency, "Time for KV-cache transfer from a remote vLLM (in milliseconds)") f.Int64Var(&config.Seed, "seed", config.Seed, "Random seed for operations (if not set, current Unix time in nanoseconds is used)") // These values were manually parsed above in getParamValueFromArgs, we leave this in order to get these flags in --help @@ -304,6 +306,8 @@ func (s *VllmSimulator) readRequest(ctx *fasthttp.RequestCtx, isChatCompletion b var req textCompletionRequest err := json.Unmarshal(ctx.Request.Body(), &req) + + fmt.Printf("Unmarshaled text request: %#v\n", req) return &req, err } @@ -329,6 +333,18 @@ func (s *VllmSimulator) HandleUnloadLora(ctx *fasthttp.RequestCtx) { s.unloadLora(ctx) } +func (s *VllmSimulator) validateRequest(req completionRequest) (string, string, int) { + if !s.isValidModel(req.getModel()) { + return fmt.Sprintf("The model `%s` does not exist.", req.getModel()), "NotFoundError", fasthttp.StatusNotFound + } + + if req.doRemoteDecode() && req.isStream() { + return "Prefill does not support streaming", "Invalid request", fasthttp.StatusBadRequest + } + + return "", "", fasthttp.StatusOK +} + // isValidModel checks if the given model is the base model or one of "loaded" LoRAs func (s *VllmSimulator) isValidModel(model string) bool { for _, name := range s.config.ServedModelNames { @@ -365,11 +381,9 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple return } - model := vllmReq.getModel() - - if !s.isValidModel(model) { - s.sendCompletionError(ctx, fmt.Sprintf("The model `%s` does not exist.", vllmReq.getModel()), - "NotFoundError", fasthttp.StatusNotFound) + errMsg, errType, errCode := s.validateRequest(vllmReq) + if errMsg != "" { + s.sendCompletionError(ctx, errMsg, errType, errCode) return } @@ -477,16 +491,23 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { isChatCompletion: reqCtx.isChatCompletion, model: displayModel, }, - responseTokens, toolCalls, finishReason, usageDataToSend, + responseTokens, toolCalls, finishReason, usageDataToSend, req.doRemotePrefill(), ) } else { + if req.doRemoteDecode() { + // in case this is prefill pod processing, return special finish reason + finishReason = remoteDecodeFinishReason + } + s.sendResponse(reqCtx.isChatCompletion, reqCtx.httpReqCtx, responseTokens, toolCalls, displayModel, finishReason, - &usageData) + &usageData, + req.doRemoteDecode(), + req.doRemotePrefill()) } } reqCtx.wg.Done() @@ -575,13 +596,25 @@ func (s *VllmSimulator) HandleError(_ *fasthttp.RequestCtx, err error) { // modelName - display name returned to the client and used in metrics. It is either the first alias // from --served-model-name (for a base-model request) or the LoRA adapter name (for a LoRA request). func (s *VllmSimulator) createCompletionResponse(isChatCompletion bool, respTokens []string, toolCalls []toolCall, - finishReason *string, usageData *usage, modelName string) completionResponse { + finishReason *string, usageData *usage, modelName string, doRemoteDecode bool) completionResponse { baseResp := baseCompletionResponse{ ID: chatComplIDPrefix + uuid.NewString(), Created: time.Now().Unix(), Model: modelName, Usage: usageData, } + + if doRemoteDecode { + // add special fields related to the prefill pod special behavior + baseResp.DoRemoteDecode = true + baseResp.DoRemotePrefill = false + // currently remote prefill information is hard-coded + baseResp.RemoteBlockIds = []string{"DUMMY_ID"} + baseResp.RemoteEngineId = "DUMMY_ID" + baseResp.RemoteHost = "DUMMY" + baseResp.RemotePort = 1234 + } + baseChoice := baseResponseChoice{Index: 0, FinishReason: finishReason} respText := strings.Join(respTokens, "") @@ -616,8 +649,8 @@ func (s *VllmSimulator) createCompletionResponse(isChatCompletion bool, respToke // finishReason - a pointer to string that represents finish reason, can be nil, stop, length, or tools // usageData - usage (tokens statistics) for this response func (s *VllmSimulator) sendResponse(isChatCompletion bool, ctx *fasthttp.RequestCtx, respTokens []string, toolCalls []toolCall, - modelName string, finishReason string, usageData *usage) { - resp := s.createCompletionResponse(isChatCompletion, respTokens, toolCalls, &finishReason, usageData, modelName) + modelName string, finishReason string, usageData *usage, doRemoteDecode bool, doRemotePrefill bool) { + resp := s.createCompletionResponse(isChatCompletion, respTokens, toolCalls, &finishReason, usageData, modelName, doRemoteDecode) data, err := json.Marshal(resp) if err != nil { @@ -627,7 +660,7 @@ func (s *VllmSimulator) sendResponse(isChatCompletion bool, ctx *fasthttp.Reques // calculate how long to wait before returning the response, time is based on number of tokens numOfTokens := usageData.CompletionTokens - totalMillisToWait := s.config.TimeToFirstToken + (numOfTokens-1)*s.config.InterTokenLatency + totalMillisToWait := s.getTimeToFirstToken(doRemotePrefill) + (numOfTokens-1)*s.config.InterTokenLatency time.Sleep(time.Duration(totalMillisToWait) * time.Millisecond) // TODO - maybe add pod id to response header for testing @@ -638,6 +671,14 @@ func (s *VllmSimulator) sendResponse(isChatCompletion bool, ctx *fasthttp.Reques s.responseSentCallback(modelName) } +// returns time to first token based on whether +func (s *VllmSimulator) getTimeToFirstToken(doRemotePrefill bool) int { + if doRemotePrefill { + return s.config.KVCacheTransferLatency + } + return s.config.TimeToFirstToken +} + // createModelsResponse creates and returns ModelResponse for the current state, returned array of models contains the base model + LoRA adapters if exist func (s *VllmSimulator) createModelsResponse() *vllmapi.ModelsResponse { modelsResp := vllmapi.ModelsResponse{Object: "list", Data: []vllmapi.ModelsResponseModelInfo{}} diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index efeb9f2..1863e1c 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -38,7 +38,7 @@ type streamingContext struct { // response content is wrapped according SSE format // First token is send after timeToFirstToken milliseconds, every other token is sent after interTokenLatency milliseconds func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, responseTokens []string, toolCalls []toolCall, - finishReason string, usageData *usage) { + finishReason string, usageData *usage, doRemotePrefill bool) { context.ctx.SetContentType("text/event-stream") context.ctx.SetStatusCode(fasthttp.StatusOK) @@ -57,11 +57,11 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons if len(toolCalls) > 0 { s.logger.Info("Going to send tools calls") for _, tc := range toolCalls { - s.sendTokenChunks(context, w, tc.Function.tokenizedArguments, &tc, finishReason) + s.sendTokenChunks(context, w, tc.Function.tokenizedArguments, &tc, finishReason, doRemotePrefill) } } else { s.logger.Info("Going to send text", "number of tokens", usageData.CompletionTokens) - s.sendTokenChunks(context, w, responseTokens, nil, finishReason) + s.sendTokenChunks(context, w, responseTokens, nil, finishReason, doRemotePrefill) } } @@ -84,9 +84,9 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons } // sendTokenChunks creates and sends response chunks -func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writer, tokens []string, tc *toolCall, finishReason string) { +func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writer, tokens []string, tc *toolCall, finishReason string, doRemotePrefill bool) { // time to first token delay - time.Sleep(time.Duration(s.config.TimeToFirstToken) * time.Millisecond) + time.Sleep(time.Duration(s.getTimeToFirstToken(doRemotePrefill)) * time.Millisecond) for i, token := range tokens { if i != 0 { From 276f15ab35fa3f9cc43e5788b655c876109c8c67 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Mon, 14 Jul 2025 15:46:50 +0300 Subject: [PATCH 2/7] Add test for kvcache transfer time command line parameter. Update config_test to use a function to create configuration same as defined in the config yaml file Signed-off-by: Maya Barnea --- manifests/basic-config.yaml | 8 ++ manifests/config.yaml | 5 +- pkg/llm-d-inference-sim/config_test.go | 100 ++++++++++++------------- 3 files changed, 57 insertions(+), 56 deletions(-) create mode 100644 manifests/basic-config.yaml diff --git a/manifests/basic-config.yaml b/manifests/basic-config.yaml new file mode 100644 index 0000000..f692544 --- /dev/null +++ b/manifests/basic-config.yaml @@ -0,0 +1,8 @@ +port: 8001 +model: "Qwen/Qwen2-0.5B" +max-num-seqs: 5 +mode: "random" +time-to-first-token: 2000 +inter-token-latency: 1000 +kv_cache_transfer_latency: 100 +seed: 100100100 diff --git a/manifests/config.yaml b/manifests/config.yaml index facdf8c..632dd69 100644 --- a/manifests/config.yaml +++ b/manifests/config.yaml @@ -10,6 +10,7 @@ lora-modules: - '{"name":"lora1","path":"/path/to/lora1"}' - '{"name":"lora2","path":"/path/to/lora2"}' mode: "random" -time-to-first-token: 2 -inter-token-latency: 1 +time-to-first-token: 2000 +inter-token-latency: 1000 +kv_cache_transfer_latency: 100 seed: 100100100 diff --git a/pkg/llm-d-inference-sim/config_test.go b/pkg/llm-d-inference-sim/config_test.go index 977aa3c..815b7a7 100644 --- a/pkg/llm-d-inference-sim/config_test.go +++ b/pkg/llm-d-inference-sim/config_test.go @@ -25,8 +25,7 @@ import ( ) const ( - qwenModelName = "Qwen/Qwen2-0.5B" - seedInConfigFile = 100100100 + qwenModelName = "Qwen/Qwen2-0.5B" ) func createSimConfig(args []string) (*configuration, error) { @@ -46,6 +45,33 @@ func createSimConfig(args []string) (*configuration, error) { return s.config, nil } +func createDefaultBasicConfig(model string) *configuration { + c := newConfig() + + c.Model = model + c.ServedModelNames = []string{c.Model} + c.MaxNumSeqs = 5 + c.MaxLoras = 1 + c.MaxCPULoras = 1 + c.TimeToFirstToken = 2000 + c.InterTokenLatency = 1000 + c.KVCacheTransferLatency = 100 + c.Seed = 100100100 + c.LoraModules = []loraModule{} + + return c +} + +func createDefaultConfig(model string) *configuration { + c := createDefaultBasicConfig(model) + + // parameters special to config.yaml + c.MaxLoras = 2 + c.MaxCPULoras = 5 + + return c +} + type testCase struct { name string args []string @@ -69,17 +95,10 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file - c = newConfig() + c = createDefaultConfig(qwenModelName) c.Port = 8001 - c.Model = qwenModelName c.ServedModelNames = []string{"model1", "model2"} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 c.LoraModules = []loraModule{{Name: "lora1", Path: "/path/to/lora1"}, {Name: "lora2", Path: "/path/to/lora2"}} - c.Seed = seedInConfigFile test = testCase{ name: "config file", args: []string{"cmd", "--config", "../../manifests/config.yaml"}, @@ -92,15 +111,9 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args - c = newConfig() + c = createDefaultConfig(model) c.Port = 8002 - c.Model = model c.ServedModelNames = []string{"alias1", "alias2"} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 c.Seed = 100 c.LoraModules = []loraModule{{Name: "lora3", Path: "/path/to/lora3"}, {Name: "lora4", Path: "/path/to/lora4"}} c.LoraModulesString = []string{ @@ -118,16 +131,8 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args with different format - c = newConfig() + c = createDefaultConfig(model) c.Port = 8002 - c.Model = model - c.ServedModelNames = []string{c.Model} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 - c.Seed = seedInConfigFile c.LoraModules = []loraModule{{Name: "lora3", Path: "/path/to/lora3"}} c.LoraModulesString = []string{ "{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}", @@ -143,16 +148,8 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args with empty string - c = newConfig() + c = createDefaultConfig(model) c.Port = 8002 - c.Model = model - c.ServedModelNames = []string{c.Model} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 - c.Seed = seedInConfigFile c.LoraModules = []loraModule{{Name: "lora3", Path: "/path/to/lora3"}} c.LoraModulesString = []string{ "{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}", @@ -168,18 +165,10 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args with empty string for loras - c = newConfig() + c = createDefaultConfig(qwenModelName) c.Port = 8001 - c.Model = qwenModelName c.ServedModelNames = []string{"model1", "model2"} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 - c.LoraModules = []loraModule{} c.LoraModulesString = []string{} - c.Seed = seedInConfigFile test = testCase{ name: "config file with command line args with empty string for loras", args: []string{"cmd", "--config", "../../manifests/config.yaml", "--lora-modules", ""}, @@ -188,18 +177,10 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args with empty parameter for loras - c = newConfig() + c = createDefaultConfig(qwenModelName) c.Port = 8001 - c.Model = qwenModelName c.ServedModelNames = []string{"model1", "model2"} - c.MaxLoras = 2 - c.MaxCPULoras = 5 - c.MaxNumSeqs = 5 - c.TimeToFirstToken = 2 - c.InterTokenLatency = 1 - c.LoraModules = []loraModule{} c.LoraModulesString = []string{} - c.Seed = seedInConfigFile test = testCase{ name: "config file with command line args with empty parameter for loras", args: []string{"cmd", "--config", "../../manifests/config.yaml", "--lora-modules"}, @@ -207,6 +188,17 @@ var _ = Describe("Simulator configuration", func() { } tests = append(tests, test) + // Config from config.yaml file plus command line args with time to copy cache + c = createDefaultBasicConfig(qwenModelName) + c.Port = 8001 + c.KVCacheTransferLatency = 50 + test = testCase{ + name: "config file with command line args with time to transfer kv-cache", + args: []string{"cmd", "--config", "../../manifests/basic-config.yaml", "--kv_cache_transfer_latency", "50"}, + expectedConfig: c, + } + tests = append(tests, test) + // Invalid configurations test = testCase{ name: "invalid model", @@ -258,6 +250,7 @@ var _ = Describe("Simulator configuration", func() { Entry(tests[4].name, tests[4].args, tests[4].expectedConfig), Entry(tests[5].name, tests[5].args, tests[5].expectedConfig), Entry(tests[6].name, tests[6].args, tests[6].expectedConfig), + Entry(tests[7].name, tests[7].args, tests[7].expectedConfig), ) DescribeTable("invalid configurations", @@ -265,7 +258,6 @@ var _ = Describe("Simulator configuration", func() { _, err := createSimConfig(args) Expect(err).To(HaveOccurred()) }, - Entry(tests[7].name, tests[7].args), Entry(tests[8].name, tests[8].args), Entry(tests[9].name, tests[9].args), Entry(tests[10].name, tests[10].args), From 626c432fca3d6d3ac9a0efc292fc6b37a8ce6f20 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Mon, 14 Jul 2025 16:06:47 +0300 Subject: [PATCH 3/7] Update readme file change command line argument name to 'kv-cache-transfer-latency' Signed-off-by: Maya Barnea --- README.md | 5 +++-- manifests/basic-config.yaml | 2 +- manifests/config.yaml | 2 +- pkg/llm-d-inference-sim/config.go | 2 +- pkg/llm-d-inference-sim/config_test.go | 2 +- pkg/llm-d-inference-sim/simulator.go | 2 +- 6 files changed, 8 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 5f4a18e..c7a13a9 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,9 @@ The simulator supports two modes of operation: - `echo` mode: the response contains the same text that was received in the request. For `/v1/chat/completions` the last message for the role=`user` is used. - `random` mode: the response is randomly chosen from a set of pre-defined sentences. -Timing of the response is defined by two parameters: `time-to-first-token` and `inter-token-latency`. +Timing of the response is defined by `time-to-first-token` and `inter-token-latency` parameters. In case P/D is enabled for a request, `kv-cache-transfer-latency` will be used instead of `time-to-first-token`. -For a request with `stream=true`: `time-to-first-token` defines the delay before the first token is returned, `inter-token-latency` defines the delay between subsequent tokens in the stream. +For a request with `stream=true`: `time-to-first-token` or `kv-cache-transfer-latency` defines the delay before the first token is returned, `inter-token-latency` defines the delay between subsequent tokens in the stream. For a requst with `stream=false`: the response is returned after delay of ` + ( * ( - 1))` @@ -99,6 +99,7 @@ For more details see the Date: Tue, 15 Jul 2025 09:17:23 +0300 Subject: [PATCH 4/7] fixes according PR's comments Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/config_test.go | 22 ++++++++-------------- pkg/llm-d-inference-sim/request.go | 24 +++++++++++++++--------- pkg/llm-d-inference-sim/response.go | 18 +++++++++++------- pkg/llm-d-inference-sim/simulator.go | 8 ++++---- pkg/llm-d-inference-sim/streaming.go | 11 ++++++----- 5 files changed, 44 insertions(+), 39 deletions(-) diff --git a/pkg/llm-d-inference-sim/config_test.go b/pkg/llm-d-inference-sim/config_test.go index f9c25c1..352ccc0 100644 --- a/pkg/llm-d-inference-sim/config_test.go +++ b/pkg/llm-d-inference-sim/config_test.go @@ -45,14 +45,14 @@ func createSimConfig(args []string) (*configuration, error) { return s.config, nil } -func createDefaultBasicConfig(model string) *configuration { +func createDefaultConfig(model string) *configuration { c := newConfig() c.Model = model c.ServedModelNames = []string{c.Model} c.MaxNumSeqs = 5 - c.MaxLoras = 1 - c.MaxCPULoras = 1 + c.MaxLoras = 2 + c.MaxCPULoras = 5 c.TimeToFirstToken = 2000 c.InterTokenLatency = 1000 c.KVCacheTransferLatency = 100 @@ -62,16 +62,6 @@ func createDefaultBasicConfig(model string) *configuration { return c } -func createDefaultConfig(model string) *configuration { - c := createDefaultBasicConfig(model) - - // parameters special to config.yaml - c.MaxLoras = 2 - c.MaxCPULoras = 5 - - return c -} - type testCase struct { name string args []string @@ -189,8 +179,11 @@ var _ = Describe("Simulator configuration", func() { tests = append(tests, test) // Config from config.yaml file plus command line args with time to copy cache - c = createDefaultBasicConfig(qwenModelName) + c = createDefaultConfig(qwenModelName) c.Port = 8001 + // basic config file does not contain properties related to lora + c.MaxLoras = 1 + c.MaxCPULoras = 1 c.KVCacheTransferLatency = 50 test = testCase{ name: "config file with command line args with time to transfer kv-cache", @@ -263,5 +256,6 @@ var _ = Describe("Simulator configuration", func() { Entry(tests[10].name, tests[10].args), Entry(tests[11].name, tests[11].args), Entry(tests[12].name, tests[12].args), + Entry(tests[13].name, tests[13].args), ) }) diff --git a/pkg/llm-d-inference-sim/request.go b/pkg/llm-d-inference-sim/request.go index 8506c13..ecee6f4 100644 --- a/pkg/llm-d-inference-sim/request.go +++ b/pkg/llm-d-inference-sim/request.go @@ -44,9 +44,9 @@ type completionRequest interface { getToolChoice() string // getMaxCompletionTokens returns the maximum completion tokens requested getMaxCompletionTokens() *int64 - // isDoRemoteDecode() returns true is do_remote_decode is true in the request, this means that this is prefill request + // doRemoteDecode() returns true if do_remote_decode field is true in the request, this means that this is prefill request doRemoteDecode() bool - // isDoRemotePrefill() returns true is do_remote_prefill is true in the request, this means that this is decode request + // doRemotePrefill() returns true if do_remote_prefill field is true in the request, this means that this is decode request doRemotePrefill() bool } @@ -57,13 +57,19 @@ type baseCompletionRequest struct { // StreamOptions defines streaming options in case Stream is set to true StreamOptions streamOptions `json:"stream_options"` // Model defines Model name to use for "inference", could be base Model name or one of available LoRA adapters - Model string `json:"model"` - DoRemoteDecode bool `json:"do_remote_decode"` - DoRemotePrefill bool `json:"do_remote_prefill"` - RemoteBlockIds []string `json:"remote_block_ids"` - RemoteEngineId string `json:"remote_engine_id"` - RemoteHost string `json:"remote_host"` - RemotePort int `json:"remote_port"` + Model string `json:"model"` + // DoRemoteDecode boolean value, true when request's decode will be done on remote pod + DoRemoteDecode bool `json:"do_remote_decode"` + // DoRemotePrefill boolean value, true when request's prefill was done on remote pod + DoRemotePrefill bool `json:"do_remote_prefill"` + // RemoteBlockIds is a list of block identifiers to process remotely for distributed decoding + RemoteBlockIds []string `json:"remote_block_ids"` + // RemoteEngineId is an identifier of the remote inference engine or backend to use for processing requests + RemoteEngineId string `json:"remote_engine_id"` + // RemoteHost is a hostname or IP address of the remote server handling prefill + RemoteHost string `json:"remote_host"` + // RemotePort is a port of the remote server handling prefill + RemotePort int `json:"remote_port"` } // StreamOptions defines streaming options for streaming requests diff --git a/pkg/llm-d-inference-sim/response.go b/pkg/llm-d-inference-sim/response.go index b7fd766..133cfc9 100644 --- a/pkg/llm-d-inference-sim/response.go +++ b/pkg/llm-d-inference-sim/response.go @@ -37,13 +37,17 @@ type baseCompletionResponse struct { // Usage contains the token usage statistics for the request Usage *usage `json:"usage"` // Object is the Object type, "text_completion", "chat.completion", or "chat.completion.chunk" - Object string `json:"object"` - DoRemoteDecode bool `json:"do_remote_decode"` - DoRemotePrefill bool `json:"do_remote_prefill"` - RemoteBlockIds []string `json:"remote_block_ids"` - RemoteEngineId string `json:"remote_engine_id"` - RemoteHost string `json:"remote_host"` - RemotePort int `json:"remote_port"` + Object string `json:"object"` + DoRemoteDecode bool `json:"do_remote_decode"` + DoRemotePrefill bool `json:"do_remote_prefill"` + // RemoteBlockIds is a list of block identifiers to process remotely for distributed decoding + RemoteBlockIds []string `json:"remote_block_ids"` + // RemoteEngineId is an identifier of the remote inference engine or backend to use for processing requests + RemoteEngineId string `json:"remote_engine_id"` + // RemoteHost is a hostname or IP address of the remote server handling prefill + RemoteHost string `json:"remote_host"` + // RemotePort is a port of the remote server handling prefill + RemotePort int `json:"remote_port"` } // usage contains token usage statistics diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 6efe795..239191b 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -303,11 +303,10 @@ func (s *VllmSimulator) readRequest(ctx *fasthttp.RequestCtx, isChatCompletion b return &req, nil } - var req textCompletionRequest + var req textCompletionRequest err := json.Unmarshal(ctx.Request.Body(), &req) - fmt.Printf("Unmarshaled text request: %#v\n", req) return &req, err } @@ -490,8 +489,9 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { ctx: reqCtx.httpReqCtx, isChatCompletion: reqCtx.isChatCompletion, model: displayModel, + doRemotePrefill: req.doRemotePrefill(), }, - responseTokens, toolCalls, finishReason, usageDataToSend, req.doRemotePrefill(), + responseTokens, toolCalls, finishReason, usageDataToSend, ) } else { if req.doRemoteDecode() { @@ -671,7 +671,7 @@ func (s *VllmSimulator) sendResponse(isChatCompletion bool, ctx *fasthttp.Reques s.responseSentCallback(modelName) } -// returns time to first token based on whether +// returns time to first token based on the current request's doRemotePrefill func (s *VllmSimulator) getTimeToFirstToken(doRemotePrefill bool) int { if doRemotePrefill { return s.config.KVCacheTransferLatency diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index 1863e1c..f19efa6 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -31,6 +31,7 @@ type streamingContext struct { isChatCompletion bool model string creationTime int64 + doRemotePrefill bool } // sendStreamingResponse creates and sends a streaming response for completion requests of both types (text and chat) @@ -38,7 +39,7 @@ type streamingContext struct { // response content is wrapped according SSE format // First token is send after timeToFirstToken milliseconds, every other token is sent after interTokenLatency milliseconds func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, responseTokens []string, toolCalls []toolCall, - finishReason string, usageData *usage, doRemotePrefill bool) { + finishReason string, usageData *usage) { context.ctx.SetContentType("text/event-stream") context.ctx.SetStatusCode(fasthttp.StatusOK) @@ -57,11 +58,11 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons if len(toolCalls) > 0 { s.logger.Info("Going to send tools calls") for _, tc := range toolCalls { - s.sendTokenChunks(context, w, tc.Function.tokenizedArguments, &tc, finishReason, doRemotePrefill) + s.sendTokenChunks(context, w, tc.Function.tokenizedArguments, &tc, finishReason) } } else { s.logger.Info("Going to send text", "number of tokens", usageData.CompletionTokens) - s.sendTokenChunks(context, w, responseTokens, nil, finishReason, doRemotePrefill) + s.sendTokenChunks(context, w, responseTokens, nil, finishReason) } } @@ -84,9 +85,9 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons } // sendTokenChunks creates and sends response chunks -func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writer, tokens []string, tc *toolCall, finishReason string, doRemotePrefill bool) { +func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writer, tokens []string, tc *toolCall, finishReason string) { // time to first token delay - time.Sleep(time.Duration(s.getTimeToFirstToken(doRemotePrefill)) * time.Millisecond) + time.Sleep(time.Duration(s.getTimeToFirstToken(context.doRemotePrefill)) * time.Millisecond) for i, token := range tokens { if i != 0 { From 651c3b0b67f77814154788d454335a96bceb71eb Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Tue, 15 Jul 2025 09:59:43 +0300 Subject: [PATCH 5/7] added comments for fields Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/response.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/llm-d-inference-sim/response.go b/pkg/llm-d-inference-sim/response.go index 133cfc9..c349c0f 100644 --- a/pkg/llm-d-inference-sim/response.go +++ b/pkg/llm-d-inference-sim/response.go @@ -37,9 +37,11 @@ type baseCompletionResponse struct { // Usage contains the token usage statistics for the request Usage *usage `json:"usage"` // Object is the Object type, "text_completion", "chat.completion", or "chat.completion.chunk" - Object string `json:"object"` - DoRemoteDecode bool `json:"do_remote_decode"` - DoRemotePrefill bool `json:"do_remote_prefill"` + Object string `json:"object"` + // DoRemoteDecode boolean value, true when request's decode will be done on remote pod + DoRemoteDecode bool `json:"do_remote_decode"` + // DoRemotePrefill boolean value, true when request's prefill was done on remote pod + DoRemotePrefill bool `json:"do_remote_prefill"` // RemoteBlockIds is a list of block identifiers to process remotely for distributed decoding RemoteBlockIds []string `json:"remote_block_ids"` // RemoteEngineId is an identifier of the remote inference engine or backend to use for processing requests From d061f7211ca437643372a1a54605f9dc98ab7d1a Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Tue, 15 Jul 2025 10:20:15 +0300 Subject: [PATCH 6/7] fix utils_test - initialize random before Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/utils_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/llm-d-inference-sim/utils_test.go b/pkg/llm-d-inference-sim/utils_test.go index 425c09a..14d8b2f 100644 --- a/pkg/llm-d-inference-sim/utils_test.go +++ b/pkg/llm-d-inference-sim/utils_test.go @@ -18,12 +18,17 @@ package llmdinferencesim import ( "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) -var _ = Describe("Utils", func() { +var _ = Describe("Utils", Ordered, func() { + BeforeAll(func() { + initRandom(time.Now().UnixNano()) + }) + Context("GetRandomResponseText", func() { It("should return complete text", func() { text, finishReason := getRandomResponseText(nil) From 3c699b86a04b390e823b8423dea42946da3b3e39 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Tue, 15 Jul 2025 13:02:14 +0300 Subject: [PATCH 7/7] fixes in readme according the PR review Signed-off-by: Maya Barnea --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c7a13a9..7d74205 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,11 @@ The simulator supports two modes of operation: - `echo` mode: the response contains the same text that was received in the request. For `/v1/chat/completions` the last message for the role=`user` is used. - `random` mode: the response is randomly chosen from a set of pre-defined sentences. -Timing of the response is defined by `time-to-first-token` and `inter-token-latency` parameters. In case P/D is enabled for a request, `kv-cache-transfer-latency` will be used instead of `time-to-first-token`. +Timing of the response is defined by the `time-to-first-token` and `inter-token-latency` parameters. In case P/D is enabled for a request, `kv-cache-transfer-latency` will be used instead of `time-to-first-token`. For a request with `stream=true`: `time-to-first-token` or `kv-cache-transfer-latency` defines the delay before the first token is returned, `inter-token-latency` defines the delay between subsequent tokens in the stream. -For a requst with `stream=false`: the response is returned after delay of ` + ( * ( - 1))` +For a requst with `stream=false`: the response is returned after delay of ` + ( * ( - 1))` or ` + ( * ( - 1))` in P/D case It can be run standalone or in a Pod for testing under packages such as Kind.