Skip to content

Commit 7ed7cf8

Browse files
committed
ING-1203: Refactored KV client management
1 parent bed9721 commit 7ed7cf8

36 files changed

+2444
-3255
lines changed

agent.go

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ import (
1919
var buildVersion string = buildversion.GetVersion("github.com/couchbase/gocbcorex")
2020

2121
type agentState struct {
22-
bucket string
23-
tlsConfig *tls.Config
24-
authenticator Authenticator
25-
numPoolConnections uint
26-
httpTransport *http.Transport
22+
bucket string
23+
tlsConfig *tls.Config
24+
authenticator Authenticator
25+
httpTransport *http.Transport
2726

28-
lastClients map[string]*KvClientConfig
2927
latestConfig *ParsedConfig
3028
}
3129

@@ -37,7 +35,8 @@ type Agent struct {
3735
state agentState
3836

3937
cfgWatcher ConfigWatcher
40-
connMgr KvClientManager
38+
connMgr KvEndpointClientManager
39+
mconnMgr MultiKvEndpointClientManager
4140
collections CollectionResolver
4241
retries RetryManager
4342
vbRouter VbucketRouter
@@ -148,12 +147,11 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
148147
networkType: networkType,
149148

150149
state: agentState{
151-
bucket: opts.BucketName,
152-
tlsConfig: opts.TLSConfig,
153-
authenticator: opts.Authenticator,
154-
numPoolConnections: connectionPoolSize,
155-
latestConfig: bootstrapConfig,
156-
httpTransport: httpTransport,
150+
bucket: opts.BucketName,
151+
tlsConfig: opts.TLSConfig,
152+
authenticator: opts.Authenticator,
153+
latestConfig: bootstrapConfig,
154+
httpTransport: httpTransport,
157155
},
158156
}
159157
if opts.RetryManager == nil {
@@ -164,20 +162,29 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
164162

165163
agentComponentConfigs := agent.genAgentComponentConfigsLocked()
166164

167-
connMgr, err := NewKvClientManager(&KvClientManagerConfig{
168-
NumPoolConnections: agent.state.numPoolConnections,
169-
Clients: agentComponentConfigs.KvClientManagerClients,
170-
}, &KvClientManagerOptions{
171-
Logger: agent.logger.Named("client-manager"),
165+
mconnMgr, err := NewMultiKvEndpointClientManager(&MultiKvEndpointClientManagerOptions{
166+
Logger: agent.logger.Named("multi-client-manager"),
167+
Endpoints: agentComponentConfigs.KvTargets,
168+
Auth: agentComponentConfigs.KvAuth,
169+
})
170+
if err != nil {
171+
return nil, handleAgentCreateErr(err)
172+
}
173+
agent.mconnMgr = mconnMgr
174+
175+
connMgr, err := mconnMgr.NewManager(NewManagerOptions{
176+
NumPoolConnections: connectionPoolSize,
177+
OnDemandConnect: false,
178+
SelectedBucket: agentComponentConfigs.KvSelectedBucket,
172179
})
173180
if err != nil {
174181
return nil, handleAgentCreateErr(err)
175182
}
176183
agent.connMgr = connMgr
177184

178185
coreCollections, err := NewCollectionResolverMemd(&CollectionResolverMemdOptions{
179-
Logger: agent.logger,
180-
ConnMgr: agent.connMgr,
186+
Logger: agent.logger,
187+
ConnProvider: agent.connMgr,
181188
})
182189
if err != nil {
183190
return nil, handleAgentCreateErr(err)
@@ -213,9 +220,9 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
213220
configWatcher, err := NewConfigWatcherMemd(
214221
&agentComponentConfigs.ConfigWatcherMemdConfig,
215222
&ConfigWatcherMemdOptions{
216-
Logger: logger.Named("memd-config-watcher"),
217-
KvClientManager: connMgr,
218-
PollingPeriod: 2500 * time.Millisecond,
223+
Logger: logger.Named("memd-config-watcher"),
224+
ClientProvider: connMgr,
225+
PollingPeriod: 2500 * time.Millisecond,
219226
},
220227
)
221228
if err != nil {
@@ -258,12 +265,12 @@ func CreateAgent(ctx context.Context, opts AgentOptions) (*Agent, error) {
258265
}
259266

260267
agent.crud = &CrudComponent{
261-
logger: agent.logger,
262-
collections: agent.collections,
263-
retries: consistencyRetryMgr,
264-
connManager: agent.connMgr,
265-
nmvHandler: &agentNmvHandler{agent},
266-
vbs: agent.vbRouter,
268+
logger: agent.logger,
269+
collections: agent.collections,
270+
retries: consistencyRetryMgr,
271+
connProvider: agent.connMgr,
272+
nmvHandler: &agentNmvHandler{agent},
273+
vbs: agent.vbRouter,
267274
compression: &CompressionManagerDefault{
268275
disableCompression: !useCompression,
269276
compressionMinSize: compressionMinSize,
@@ -370,8 +377,8 @@ func (agent *Agent) NumVbuckets() int {
370377
}
371378

372379
func (agent *Agent) Close() error {
373-
if err := agent.connMgr.Close(); err != nil {
374-
agent.logger.Debug("Failed to close conn mgr", zap.Error(err))
380+
if err := agent.mconnMgr.Close(); err != nil {
381+
agent.logger.Debug("Failed to close multi conn mgr", zap.Error(err))
375382
}
376383

377384
agent.cfgWatcherCancel()
@@ -412,41 +419,23 @@ func (agent *Agent) updateStateLocked() {
412419
// the routing table. Then go back and remove the old entries from
413420
// the connection manager list.
414421

415-
oldClients := make(map[string]*KvClientConfig)
416-
if agent.state.lastClients != nil {
417-
for clientName, client := range agent.state.lastClients {
418-
oldClients[clientName] = client
419-
}
420-
}
421-
for clientName, client := range agentComponentConfigs.KvClientManagerClients {
422-
if oldClients[clientName] == nil {
423-
oldClients[clientName] = client
424-
}
425-
}
426-
427-
err := agent.connMgr.Reconfigure(&KvClientManagerConfig{
428-
NumPoolConnections: agent.state.numPoolConnections,
429-
Clients: oldClients,
430-
})
422+
err := agent.mconnMgr.UpdateEndpoints(agentComponentConfigs.KvTargets, true)
431423
if err != nil {
432-
agent.logger.Error("failed to reconfigure connection manager (old clients)", zap.Error(err))
424+
agent.logger.Error("failed to add-only update kv connection manager endpoint", zap.Error(err))
433425
}
434426

435427
agent.vbRouter.UpdateRoutingInfo(agentComponentConfigs.VbucketRoutingInfo)
436428

437429
if agent.memdCfgWatcher != nil {
438-
err = agent.memdCfgWatcher.Reconfigure(&agentComponentConfigs.ConfigWatcherMemdConfig)
430+
err := agent.memdCfgWatcher.Reconfigure(&agentComponentConfigs.ConfigWatcherMemdConfig)
439431
if err != nil {
440432
agent.logger.Error("failed to reconfigure memd config watcher component", zap.Error(err))
441433
}
442434
}
443435

444-
err = agent.connMgr.Reconfigure(&KvClientManagerConfig{
445-
NumPoolConnections: agent.state.numPoolConnections,
446-
Clients: agentComponentConfigs.KvClientManagerClients,
447-
})
436+
err = agent.mconnMgr.UpdateEndpoints(agentComponentConfigs.KvTargets, false)
448437
if err != nil {
449-
agent.logger.Error("failed to reconfigure connection manager (updated clients)", zap.Error(err))
438+
agent.logger.Error("failed to update kv connection manager endpoint", zap.Error(err))
450439
}
451440

452441
err = agent.query.Reconfigure(&agentComponentConfigs.QueryComponentConfig)

authenticator.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,23 @@ func (a *PasswordAuthenticator) GetCredentials(
2525
) (string, string, error) {
2626
return a.Username, a.Password, nil
2727
}
28+
29+
type kvClientAuth struct {
30+
Authenticator Authenticator
31+
}
32+
33+
var _ KvClientAuth = (*kvClientAuth)(nil)
34+
35+
func (v *kvClientAuth) GetAuth(address string) (username, password string, clientCert *tls.Certificate, err error) {
36+
clientCert, err = v.Authenticator.GetClientCertificate(ServiceTypeMemd, address)
37+
if err != nil {
38+
return "", "", nil, err
39+
}
40+
41+
username, password, err = v.Authenticator.GetCredentials(ServiceTypeMemd, address)
42+
if err != nil {
43+
return "", "", nil, err
44+
}
45+
46+
return username, password, clientCert, nil
47+
}

collectionresolver_memd.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
)
99

1010
type CollectionResolverMemdOptions struct {
11-
Logger *zap.Logger
12-
ConnMgr KvClientManager
11+
Logger *zap.Logger
12+
ConnProvider KvClientProvider
1313
}
1414

1515
type CollectionResolverMemd struct {
16-
logger *zap.Logger
17-
connMgr KvClientManager
16+
logger *zap.Logger
17+
connProvider KvClientProvider
1818
}
1919

2020
var _ CollectionResolver = (*CollectionResolverMemd)(nil)
@@ -25,16 +25,16 @@ func NewCollectionResolverMemd(opts *CollectionResolverMemdOptions) (*Collection
2525
}
2626

2727
return &CollectionResolverMemd{
28-
logger: loggerOrNop(opts.Logger),
29-
connMgr: opts.ConnMgr,
28+
logger: loggerOrNop(opts.Logger),
29+
connProvider: opts.ConnProvider,
3030
}, nil
3131
}
3232

3333
func (cr *CollectionResolverMemd) ResolveCollectionID(
3434
ctx context.Context, scopeName, collectionName string,
3535
) (collectionId uint32, manifestRev uint64, err error) {
36-
resp, err := OrchestrateRandomMemdClient(
37-
ctx, cr.connMgr,
36+
resp, err := OrchestrateKvClient(
37+
ctx, cr.connProvider,
3838
func(client KvClient) (*memdx.GetCollectionIDResponse, error) {
3939
return client.GetCollectionID(ctx, &memdx.GetCollectionIDRequest{
4040
ScopeName: scopeName,

componentconfigs.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
type AgentComponentConfigs struct {
1010
ConfigWatcherHttpConfig ConfigWatcherHttpConfig
1111
ConfigWatcherMemdConfig ConfigWatcherMemdConfig
12-
KvClientManagerClients map[string]*KvClientConfig
12+
KvTargets map[string]KvTarget
13+
KvAuth KvClientAuth
14+
KvSelectedBucket string
1315
VbucketRoutingInfo *VbucketRoutingInfo
1416
QueryComponentConfig QueryComponentConfig
1517
MgmtComponentConfig MgmtComponentConfig
@@ -83,14 +85,20 @@ func GenerateComponentConfigsFromConfig(
8385
}
8486
}
8587

86-
clients := make(map[string]*KvClientConfig)
88+
var kvTlsConfig *KvTargetTlsConfig
89+
if tlsConfig != nil {
90+
kvTlsConfig = &KvTargetTlsConfig{
91+
InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
92+
RootCAs: tlsConfig.RootCAs,
93+
CipherSuites: tlsConfig.CipherSuites,
94+
}
95+
}
96+
97+
kvTargets := make(map[string]KvTarget, len(kvDataHosts))
8798
for nodeId, addr := range kvDataHosts {
88-
clients[nodeId] = &KvClientConfig{
89-
Address: addr,
90-
TlsConfig: tlsConfig,
91-
ClientName: clientName,
92-
SelectedBucket: bucketName,
93-
Authenticator: authenticator,
99+
kvTargets[nodeId] = KvTarget{
100+
Address: addr,
101+
TLSConfig: kvTlsConfig,
94102
}
95103
}
96104

@@ -110,7 +118,11 @@ func GenerateComponentConfigsFromConfig(
110118
ConfigWatcherMemdConfig: ConfigWatcherMemdConfig{
111119
Endpoints: kvDataNodeIds,
112120
},
113-
KvClientManagerClients: clients,
121+
KvTargets: kvTargets,
122+
KvAuth: &kvClientAuth{
123+
Authenticator: authenticator,
124+
},
125+
KvSelectedBucket: bucketName,
114126
VbucketRoutingInfo: &VbucketRoutingInfo{
115127
VbMap: config.VbucketMap,
116128
ServerList: kvDataNodeIds,

configwatcher_memd.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,29 @@ type ConfigWatcherMemdConfig struct {
1616
}
1717

1818
type ConfigWatcherMemdOptions struct {
19-
Logger *zap.Logger
20-
KvClientManager KvClientManager
21-
PollingPeriod time.Duration
19+
Logger *zap.Logger
20+
ClientProvider KvEndpointClientProvider
21+
PollingPeriod time.Duration
2222
}
2323

2424
type configWatcherMemdState struct {
2525
endpoints []string
2626
}
2727

2828
type ConfigWatcherMemd struct {
29-
logger *zap.Logger
30-
kvClientManager KvClientManager
31-
pollingPeriod time.Duration
29+
logger *zap.Logger
30+
clientProvider KvEndpointClientProvider
31+
pollingPeriod time.Duration
3232

3333
lock sync.Mutex
3434
state *configWatcherMemdState
3535
}
3636

3737
func NewConfigWatcherMemd(config *ConfigWatcherMemdConfig, opts *ConfigWatcherMemdOptions) (*ConfigWatcherMemd, error) {
3838
return &ConfigWatcherMemd{
39-
logger: opts.Logger,
40-
kvClientManager: opts.KvClientManager,
41-
pollingPeriod: opts.PollingPeriod,
39+
logger: opts.Logger,
40+
clientProvider: opts.ClientProvider,
41+
pollingPeriod: opts.PollingPeriod,
4242
state: &configWatcherMemdState{
4343
endpoints: config.Endpoints,
4444
},
@@ -57,10 +57,10 @@ func (w *ConfigWatcherMemd) Reconfigure(config *ConfigWatcherMemdConfig) error {
5757
func configWatcherMemd_pollOne(
5858
ctx context.Context,
5959
logger *zap.Logger,
60-
kvClientManager KvClientManager,
60+
clientProvider KvEndpointClientProvider,
6161
endpoint string,
6262
) (*ParsedConfig, error) {
63-
client, err := kvClientManager.GetClient(ctx, endpoint)
63+
client, err := clientProvider.GetEndpointClient(ctx, endpoint)
6464
if err != nil {
6565
return nil, err
6666
}
@@ -145,7 +145,7 @@ func (w *ConfigWatcherMemd) watchThread(ctx context.Context, outCh chan<- *Parse
145145
parsedConfig, err := configWatcherMemd_pollOne(
146146
pollCtx,
147147
w.logger,
148-
w.kvClientManager,
148+
w.clientProvider,
149149
endpoint)
150150
cancel()
151151
if err != nil {

0 commit comments

Comments
 (0)