@@ -32,7 +32,6 @@ import (
32
32
33
33
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
34
34
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
35
- borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts"
36
35
borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc"
37
36
borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types"
38
37
borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils"
@@ -62,14 +61,12 @@ type BorweinModelResultFetcher struct {
62
61
name string
63
62
qosConfig * generic.QoSConfiguration
64
63
65
- nodeFeatureNames []string // handled by GetNodeFeature
66
- containerFeatureNames []string // handled by GetContainerFeature
67
- inferenceServiceSocketAbsPath string
64
+ nodeFeatureNames []string // handled by GetNodeFeature
65
+ containerFeatureNames []string // handled by GetContainerFeature
68
66
modelNameToInferenceSvcSockAbsPath map [string ]string // map modelName to inference server sock path
69
67
70
68
emitter metrics.MetricEmitter
71
69
72
- infSvcClient borweininfsvc.InferenceServiceClient
73
70
modelNameToInferenceSvcClient map [string ]borweininfsvc.InferenceServiceClient // map modelName to its inference client
74
71
clientLock sync.RWMutex
75
72
}
@@ -137,7 +134,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
137
134
metaWriter metacache.MetaWriter , metaServer * metaserver.MetaServer ,
138
135
) error {
139
136
bmrf .clientLock .RLock ()
140
- if bmrf . infSvcClient == nil && len (bmrf .modelNameToInferenceSvcClient ) == 0 {
137
+ if len (bmrf .modelNameToInferenceSvcClient ) == 0 {
141
138
bmrf .clientLock .RUnlock ()
142
139
return fmt .Errorf ("infSvcClient isn't initialized" )
143
140
}
@@ -175,16 +172,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
175
172
return fmt .Errorf ("getInferenceRequestForPods failed with error: %v" , err )
176
173
}
177
174
178
- bmrf .clientLock .RLock ()
179
- var infSvcClients map [string ]borweininfsvc.InferenceServiceClient
180
- if len (bmrf .modelNameToInferenceSvcClient ) > 0 {
181
- infSvcClients = bmrf .modelNameToInferenceSvcClient
182
- } else {
183
- infSvcClients = map [string ]borweininfsvc.InferenceServiceClient {
184
- borweinconsts .ModelNameBorwein : bmrf .infSvcClient ,
185
- }
186
- }
187
- bmrf .clientLock .RUnlock ()
175
+ infSvcClients := bmrf .modelNameToInferenceSvcClient
188
176
189
177
errCh := make (chan error , len (infSvcClients ))
190
178
for modelName , client := range infSvcClients {
@@ -380,57 +368,44 @@ func (bmrf *BorweinModelResultFetcher) initInferenceSvcClientConn() (bool, error
380
368
// todo: emit metrics when initializing client connection failed
381
369
382
370
// never success
383
- if bmrf . inferenceServiceSocketAbsPath == "" && len (bmrf .modelNameToInferenceSvcSockAbsPath ) == 0 {
371
+ if len (bmrf .modelNameToInferenceSvcSockAbsPath ) == 0 {
384
372
return false , fmt .Errorf ("empty inference service socks information" )
385
373
}
386
374
387
- if len (bmrf .modelNameToInferenceSvcSockAbsPath ) > 0 {
388
- modelNameToConn := make (map [string ]* grpc.ClientConn , len (bmrf .modelNameToInferenceSvcSockAbsPath ))
389
-
390
- allSuccess := true
391
- for modelName , sockAbsPath := range bmrf .modelNameToInferenceSvcSockAbsPath {
392
- infSvcConn , err := process .Dial (sockAbsPath , 5 * time .Second )
393
- if err != nil {
394
- general .Errorf ("get inference svc connection with socket: %s for model: %s failed with error" ,
395
- sockAbsPath , modelName )
396
- allSuccess = false
397
- break
398
- }
399
- general .Infof ("init inference svc connection with socket: %s for model: %s success" , sockAbsPath , modelName )
400
-
401
- modelNameToConn [modelName ] = infSvcConn
402
- }
375
+ modelNameToConn := make (map [string ]* grpc.ClientConn , len (bmrf .modelNameToInferenceSvcSockAbsPath ))
403
376
404
- if ! allSuccess {
405
- for modelName , conn := range modelNameToConn {
406
- err := conn .Close ()
407
- if err != nil {
408
- general .Errorf ("close connection for model: %s failed with error: %v" ,
409
- modelName , err )
410
- }
411
- }
412
- } else {
413
- bmrf .clientLock .Lock ()
414
- bmrf .modelNameToInferenceSvcClient = make (map [string ]borweininfsvc.InferenceServiceClient , len (modelNameToConn ))
415
- for modelName , conn := range modelNameToConn {
416
- bmrf .modelNameToInferenceSvcClient [modelName ] = borweininfsvc .NewInferenceServiceClient (conn )
417
- }
418
- bmrf .clientLock .Unlock ()
377
+ allSuccess := true
378
+ for modelName , sockAbsPath := range bmrf .modelNameToInferenceSvcSockAbsPath {
379
+ infSvcConn , err := process .Dial (sockAbsPath , 5 * time .Second )
380
+ if err != nil {
381
+ general .Errorf ("get inference svc connection with socket: %s for model: %s failed with error" ,
382
+ sockAbsPath , modelName )
383
+ allSuccess = false
384
+ break
419
385
}
386
+ general .Infof ("init inference svc connection with socket: %s for model: %s success" , sockAbsPath , modelName )
420
387
421
- return allSuccess , nil
388
+ modelNameToConn [ modelName ] = infSvcConn
422
389
}
423
390
424
- infSvcConn , err := process .Dial (bmrf .inferenceServiceSocketAbsPath , 5 * time .Second )
425
- if err != nil {
426
- general .Errorf ("get inference svc connection with socket: %s failed with error: %v" , bmrf .inferenceServiceSocketAbsPath , err )
427
- return false , nil
391
+ if ! allSuccess {
392
+ for modelName , conn := range modelNameToConn {
393
+ err := conn .Close ()
394
+ if err != nil {
395
+ general .Errorf ("close connection for model: %s failed with error: %v" ,
396
+ modelName , err )
397
+ }
398
+ }
399
+ } else {
400
+ bmrf .clientLock .Lock ()
401
+ bmrf .modelNameToInferenceSvcClient = make (map [string ]borweininfsvc.InferenceServiceClient , len (modelNameToConn ))
402
+ for modelName , conn := range modelNameToConn {
403
+ bmrf .modelNameToInferenceSvcClient [modelName ] = borweininfsvc .NewInferenceServiceClient (conn )
404
+ }
405
+ bmrf .clientLock .Unlock ()
428
406
}
429
407
430
- bmrf .clientLock .Lock ()
431
- bmrf .infSvcClient = borweininfsvc .NewInferenceServiceClient (infSvcConn )
432
- bmrf .clientLock .Unlock ()
433
- return true , nil
408
+ return allSuccess , nil
434
409
}
435
410
436
411
func NewBorweinModelResultFetcher (fetcherName string , conf * config.Configuration , extraConf interface {},
@@ -455,11 +430,10 @@ func NewBorweinModelResultFetcher(fetcherName string, conf *config.Configuration
455
430
qosConfig : conf .QoSConfiguration ,
456
431
nodeFeatureNames : conf .BorweinConfiguration .NodeFeatureNames ,
457
432
containerFeatureNames : conf .BorweinConfiguration .ContainerFeatureNames ,
458
- inferenceServiceSocketAbsPath : conf .BorweinConfiguration .InferenceServiceSocketAbsPath ,
459
433
modelNameToInferenceSvcSockAbsPath : conf .BorweinConfiguration .ModelNameToInferenceSvcSockAbsPath ,
460
434
}
461
435
462
- // fetcher initializing doesn't block sys-adviosr main process
436
+ // fetcher initializing doesn't block sys-advisor main process
463
437
go func () {
464
438
err := wait .PollImmediateInfinite (5 * time .Second , bmrf .initInferenceSvcClientConn )
465
439
if err != nil {
0 commit comments