Skip to content

Commit 0b75804

Browse files
committed
Refactor BackendManager / BackendStorage.
This also fixes #294
1 parent a650c3b commit 0b75804

11 files changed

+865
-983
lines changed

pkg/server/backend_manager.go

Lines changed: 74 additions & 289 deletions
Large diffs are not rendered by default.

pkg/server/backend_manager_test.go

Lines changed: 181 additions & 254 deletions
Large diffs are not rendered by default.

pkg/server/default_route_backend_manager.go

Lines changed: 0 additions & 59 deletions
This file was deleted.

pkg/server/desthost_backend_manager.go

Lines changed: 0 additions & 86 deletions
This file was deleted.

pkg/server/readiness_manager.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,4 @@ type ReadinessManager interface {
2323
Ready() (bool, string)
2424
}
2525

26-
var _ ReadinessManager = &DefaultBackendStorage{}
27-
28-
func (s *DefaultBackendStorage) Ready() (bool, string) {
29-
if s.NumBackends() == 0 {
30-
return false, "no connection to any proxy agent"
31-
}
32-
return true, ""
33-
}
26+
var _ ReadinessManager = &DefaultBackendManager{}

pkg/server/server.go

Lines changed: 8 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,6 @@ type ProxyClientConnection struct {
106106
dialAddress string // cached for logging
107107
}
108108

109-
const (
110-
destHostKey key = iota
111-
)
112-
113109
func (c *ProxyClientConnection) send(pkt *client.Packet) error {
114110
defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now())
115111
if c.Mode == ModeGRPC {
@@ -193,8 +189,7 @@ func (pm *PendingDialManager) removeForStream(streamUID string) []*ProxyClientCo
193189

194190
// ProxyServer
195191
type ProxyServer struct {
196-
// BackendManagers contains a list of BackendManagers
197-
BackendManagers []BackendManager
192+
BackendManager BackendManager
198193

199194
// Readiness reports if the proxy server is ready, i.e., if the proxy
200195
// server has connections to proxy agents (backends). Note that the
@@ -215,9 +210,6 @@ type ProxyServer struct {
215210

216211
// agent authentication
217212
AgentAuthenticationOptions *AgentTokenAuthenticationOptions
218-
219-
// TODO: move strategies into BackendStorage
220-
proxyStrategies []ProxyStrategy
221213
}
222214

223215
// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
@@ -233,45 +225,17 @@ var _ agent.AgentServiceServer = &ProxyServer{}
233225

234226
var _ client.ProxyServiceServer = &ProxyServer{}
235227

236-
func genContext(proxyStrategies []ProxyStrategy, reqHost string) context.Context {
237-
ctx := context.Background()
238-
for _, ps := range proxyStrategies {
239-
switch ps {
240-
case ProxyStrategyDestHost:
241-
addr := util.RemovePortFromHost(reqHost)
242-
ctx = context.WithValue(ctx, destHostKey, addr)
243-
}
244-
}
245-
return ctx
246-
}
247-
248228
func (s *ProxyServer) getBackend(reqHost string) (Backend, error) {
249-
ctx := genContext(s.proxyStrategies, reqHost)
250-
for _, bm := range s.BackendManagers {
251-
be, err := bm.Backend(ctx)
252-
if err == nil {
253-
return be, nil
254-
}
255-
if ignoreNotFound(err) != nil {
256-
// if can't find a backend through current BackendManager, move on
257-
// to the next one
258-
return nil, err
259-
}
260-
}
261-
return nil, &ErrNotFound{}
229+
addr := util.RemovePortFromHost(reqHost)
230+
return s.BackendManager.Backend(addr)
262231
}
263232

264233
func (s *ProxyServer) addBackend(backend Backend) {
265-
// TODO: refactor BackendStorage to acquire lock once, not up to 3 times.
266-
for _, bm := range s.BackendManagers {
267-
bm.AddBackend(backend)
268-
}
234+
s.BackendManager.AddBackend(backend)
269235
}
270236

271237
func (s *ProxyServer) removeBackend(backend Backend) {
272-
for _, bm := range s.BackendManagers {
273-
bm.RemoveBackend(backend)
274-
}
238+
s.BackendManager.RemoveBackend(backend)
275239
}
276240

277241
func (s *ProxyServer) addEstablished(agentID string, connID int64, p *ProxyClientConnection) {
@@ -377,30 +341,16 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien
377341

378342
// NewProxyServer creates a new ProxyServer instance
379343
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
380-
var bms []BackendManager
381-
for _, ps := range proxyStrategies {
382-
switch ps {
383-
case ProxyStrategyDestHost:
384-
bms = append(bms, NewDestHostBackendManager())
385-
case ProxyStrategyDefault:
386-
bms = append(bms, NewDefaultBackendManager())
387-
case ProxyStrategyDefaultRoute:
388-
bms = append(bms, NewDefaultRouteBackendManager())
389-
default:
390-
klog.ErrorS(nil, "Unknown proxy strategy", "strategy", ps)
391-
}
392-
}
344+
bm := NewDefaultBackendManager(proxyStrategies)
393345

394346
return &ProxyServer{
395347
established: make(map[string](map[int64]*ProxyClientConnection)),
396348
PendingDial: NewPendingDialManager(),
397349
serverID: serverID,
398350
serverCount: serverCount,
399-
BackendManagers: bms,
351+
BackendManager: bm,
400352
AgentAuthenticationOptions: agentAuthenticationOptions,
401-
// use the first backend-manager as the Readiness Manager
402-
Readiness: bms[0],
403-
proxyStrategies: proxyStrategies,
353+
Readiness: bm,
404354
}
405355
}
406356

0 commit comments

Comments
 (0)