Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,15 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*peers.Parts, error)
}

// Select the list of servers to check based on what region we are querying
s.peerLock.RLock()

var rawTargets []*peers.Parts
var targets []*peers.Parts
if region == s.Region() {
rawTargets = make([]*peers.Parts, 0, len(s.localPeers))
for _, srv := range s.localPeers {
rawTargets = append(rawTargets, srv)
}
targets = s.peersCache.LocalPeers()
} else {
peers, ok := s.peers[region]
if !ok {
s.peerLock.RUnlock()
targets = s.peersCache.RegionPeers(region)
if targets == nil {
return nil, structs.ErrNoRegionPath
}
rawTargets = peers
}

targets := make([]*peers.Parts, 0, len(rawTargets))
for _, target := range rawTargets {
targets = append(targets, target.Copy())
}
s.peerLock.RUnlock()

// connections is used to store the servers that have connections to the
// requested node.
Expand Down
13 changes: 1 addition & 12 deletions nomad/encrypter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/crypto"
"github.com/hashicorp/nomad/helper/joseutil"
"github.com/hashicorp/nomad/nomad/peers"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/raft"
Expand Down Expand Up @@ -1189,7 +1188,7 @@ func (krr *KeyringReplicator) replicateKey(ctx context.Context, wrappedKeys *str
cfg := krr.srv.GetConfig()
self := fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Region)

for _, peer := range krr.getAllPeers() {
for _, peer := range krr.srv.peersCache.LocalPeers() {
if peer.Name == self {
continue
}
Expand Down Expand Up @@ -1222,13 +1221,3 @@ func (krr *KeyringReplicator) replicateKey(ctx context.Context, wrappedKeys *str
krr.logger.Debug("added key", "key", keyID)
return nil
}

func (krr *KeyringReplicator) getAllPeers() []*peers.Parts {
krr.srv.peerLock.RLock()
defer krr.srv.peerLock.RUnlock()
peers := make([]*peers.Parts, 0, len(krr.srv.localPeers))
for _, peer := range krr.srv.localPeers {
peers = append(peers, peer.Copy())
}
return peers
}
15 changes: 1 addition & 14 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}

n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(args.Node.ID, snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
Expand Down Expand Up @@ -467,14 +465,7 @@ func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateS
reply.LeaderRPCAddr = string(leaderAddr)

// Reply with config information required for future RPC requests
reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers))
for _, v := range n.srv.localPeers {
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RPCAdvertiseAddr: v.RPCAddr.String(),
Datacenter: v.Datacenter,
})
}
reply.Servers = n.srv.peersCache.LocalPeersServerInfo()

ws := memdb.NewWatchSet()

Expand Down Expand Up @@ -879,8 +870,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct

// Set the reply index and leader
reply.Index = index
n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
Expand Down Expand Up @@ -1158,8 +1147,6 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
// Set the reply index
reply.Index = evalIndex

n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
Expand Down
12 changes: 3 additions & 9 deletions nomad/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,8 @@ func (tc testcluster) anyFollowerRaftServerID() raft.ServerID {

var tgtID raft.ServerID

s1.peerLock.Lock()
defer s1.peerLock.Unlock()

// Find the first non-leader server in the list.
for _, sp := range s1.localPeers {
for _, sp := range s1.peersCache.LocalPeers() {
tgtID = raft.ServerID(sp.ID)
if tgtID != ldrID {
break
Expand All @@ -346,12 +343,9 @@ func (tc testcluster) anyFollowerRaftServerAddress() raft.ServerAddress {

var addr raft.ServerAddress

s1.peerLock.Lock()
defer s1.peerLock.Unlock()

// Find the first non-leader server in the list.
for a := range s1.localPeers {
addr = a
for _, a := range s1.peersCache.LocalPeers() {
addr = raft.ServerAddress(a.Addr.String())
if addr != lAddr {
break
}
Expand Down
Loading
Loading