Skip to content

Commit 64a391c

Browse files
authored
fixed consul xresolver, service discovery and improved logging (#490)
* fixed consul xresolver and improved logging * adding pr url * fixed consul service discovery to pass QueryOptions
1 parent f07406b commit 64a391c

File tree

6 files changed

+63
-42
lines changed

6 files changed

+63
-42
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8-
8+
- fixed `ConsulWatch` in xresolver by storing and watching the correct part of the url [#490](https://github.com/xmidt-org/webpa-common/pull/490)
9+
- fixed consul service discovery to pass QueryOptions [#490](https://github.com/xmidt-org/webpa-common/pull/490)
910

1011
## [v1.10.1]
1112
### Fixed
12-
- Device metadata didn't return a read-only view of its map claims resulting in data races [483](https://github.com/xmidt-org/webpa-common/pull/483)
13+
- Device metadata didn't return a read-only view of its map claims resulting in data races [#483](https://github.com/xmidt-org/webpa-common/pull/483)
1314

1415

1516
## [v1.10.0]

service/consul/instancer.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ func NewInstancer(o InstancerOptions) sd.Instancer {
3737
}
3838

3939
i := &instancer{
40-
client: o.Client,
41-
logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter),
42-
service: o.Service,
43-
passingOnly: o.PassingOnly,
44-
stop: make(chan struct{}),
45-
registry: make(map[chan<- sd.Event]bool),
40+
client: o.Client,
41+
logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter),
42+
service: o.Service,
43+
passingOnly: o.PassingOnly,
44+
queryOptions: o.QueryOptions,
45+
stop: make(chan struct{}),
46+
registry: make(map[chan<- sd.Event]bool),
4647
}
4748

4849
if len(o.Tags) > 0 {

xresolver/consul/consullistener.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@ import (
44
"context"
55
"errors"
66
"github.com/go-kit/kit/log"
7+
"github.com/go-kit/kit/log/level"
78
"github.com/xmidt-org/webpa-common/logging"
89
"github.com/xmidt-org/webpa-common/service/monitor"
910
"github.com/xmidt-org/webpa-common/xresolver"
11+
"net/url"
1012
"regexp"
1113
)
1214

1315
var find = regexp.MustCompile("(.*)" + regexp.QuoteMeta("[") + "(.*)" + regexp.QuoteMeta("]") + regexp.QuoteMeta("{") + "(.*)" + regexp.QuoteMeta("}"))
1416

1517
type Options struct {
1618
// Watch is what to url to match with the consul service
17-
// exp. { "beta.google.com" : "caduceus" }
19+
// exp. { "http://beta.google.com:8080/notify" : "caduceus" }
1820
Watch map[string]string `json:"watch"`
1921

2022
Logger log.Logger `json:"-"`
@@ -23,8 +25,6 @@ type Options struct {
2325
type ConsulWatcher struct {
2426
logger log.Logger
2527

26-
config Options
27-
2828
watch map[string]string
2929
balancers map[string]*xresolver.RoundRobin
3030
}
@@ -35,10 +35,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher {
3535
}
3636

3737
watcher := &ConsulWatcher{
38-
logger: logging.Debug(o.Logger),
39-
40-
config: o,
41-
38+
logger: log.WithPrefix(o.Logger, "component", "consulWatcher"),
4239
balancers: make(map[string]*xresolver.RoundRobin),
4340
watch: make(map[string]string),
4441
}
@@ -53,7 +50,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher {
5350
}
5451

5552
func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) {
56-
logging.Debug(watcher.logger, logging.MessageKey(), "received update route event", "event", e)
53+
log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "received update route event", "event", e)
5754

5855
// update balancers
5956
str := find.FindStringSubmatch(e.Key)
@@ -63,35 +60,43 @@ func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) {
6360

6461
service := str[1]
6562
if rr, found := watcher.balancers[service]; found {
66-
routes := make([]xresolver.Route, len(e.Instances))
67-
for index, instance := range e.Instances {
63+
routes := make([]xresolver.Route, 0)
64+
for _, instance := range e.Instances {
6865
// find records
6966
route, err := xresolver.CreateRoute(instance)
7067
if err != nil {
71-
logging.Error(watcher.logger, logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance)
68+
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log(logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance)
7269
continue
7370
}
74-
routes[index] = route
71+
routes = append(routes, route)
7572
}
7673
rr.Update(routes)
77-
logging.Info(watcher.logger, logging.MessageKey(), "updating routes", "service", service, "new-routes", routes)
74+
log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "updating routes", "service", service, "new-routes", routes)
7875
}
7976
}
8077

81-
func (watcher *ConsulWatcher) WatchService(url string, service string) {
82-
if _, found := watcher.watch[url]; !found {
83-
watcher.watch[url] = service
78+
func (watcher *ConsulWatcher) WatchService(watchURL string, service string) {
79+
parsedURL, err := url.Parse(watchURL)
80+
if err != nil {
81+
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("failed to parse url", "url", watchURL, "service", service)
82+
return
83+
}
84+
log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "Watching Service", "url", watchURL, "service", service, "host", parsedURL.Hostname())
85+
86+
if _, found := watcher.watch[parsedURL.Hostname()]; !found {
87+
watcher.watch[parsedURL.Hostname()] = service
8488
if _, found := watcher.balancers[service]; !found {
8589
watcher.balancers[service] = xresolver.NewRoundRobinBalancer()
8690
}
8791
}
8892
}
8993

9094
func (watcher *ConsulWatcher) LookupRoutes(ctx context.Context, host string) ([]xresolver.Route, error) {
91-
if _, found := watcher.config.Watch[host]; !found {
95+
if _, found := watcher.watch[host]; !found {
96+
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("watch not found ", "host", host)
9297
return []xresolver.Route{}, errors.New(host + " is not part of the consul listener")
9398
}
94-
records, err := watcher.balancers[watcher.config.Watch[host]].Get()
95-
logging.Debug(watcher.logger, logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err)
99+
records, err := watcher.balancers[watcher.watch[host]].Get()
100+
log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err)
96101
return records, err
97102
}

xresolver/consul/consullistener_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package consul
33
import (
44
"fmt"
55
"github.com/stretchr/testify/assert"
6+
"github.com/xmidt-org/webpa-common/logging"
67
"github.com/xmidt-org/webpa-common/service/monitor"
78
"github.com/xmidt-org/webpa-common/xresolver"
89
"io/ioutil"
@@ -19,8 +20,9 @@ func TestConsulWatcher(t *testing.T) {
1920
customport := "8080"
2021
service := "custom"
2122
expectedBody := "Hello World\n"
23+
fallBackURL := "http://" + net.JoinHostPort(customhost, customport)
2224

23-
//customInstance := "custom.host-A.com"
25+
// customInstance := "custom.host-A.com"
2426

2527
serverA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2628
fmt.Fprint(w, "a"+expectedBody)
@@ -33,7 +35,7 @@ func TestConsulWatcher(t *testing.T) {
3335
defer serverB.Close()
3436

3537
watcher := NewConsulWatcher(Options{
36-
Watch: map[string]string{customhost: service},
38+
Watch: map[string]string{fallBackURL: service},
3739
})
3840

3941
// note: MonitorEvent is Listen interface in the monitor package
@@ -44,13 +46,13 @@ func TestConsulWatcher(t *testing.T) {
4446

4547
client := &http.Client{
4648
Transport: &http.Transport{
47-
DialContext: xresolver.NewResolver(xresolver.DefaultDialer, watcher).DialContext,
49+
DialContext: xresolver.NewResolver(xresolver.DefaultDialer, logging.NewTestLogger(nil, t), watcher).DialContext,
4850
// note: DisableKeepAlives is required so when we do the request again we don't reuse the same connection.
4951
DisableKeepAlives: true,
5052
},
5153
}
5254

53-
req, err := http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil)
55+
req, err := http.NewRequest("GET", fallBackURL, nil)
5456
assert.NoError(err)
5557

5658
res, err := client.Do(req)
@@ -64,7 +66,7 @@ func TestConsulWatcher(t *testing.T) {
6466
assert.Equal("a"+expectedBody, string(body))
6567
}
6668

67-
req, err = http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil)
69+
req, err = http.NewRequest("GET", fallBackURL, nil)
6870
assert.NoError(err)
6971

7072
res, err = client.Do(req)

xresolver/xresolver.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package xresolver
33
import (
44
"context"
55
"errors"
6+
"github.com/go-kit/kit/log"
7+
"github.com/go-kit/kit/log/level"
8+
"github.com/xmidt-org/webpa-common/logging"
9+
610
"net"
711
"strconv"
812
"sync"
@@ -16,12 +20,17 @@ type resolver struct {
1620
resolvers map[Lookup]bool
1721
lock sync.RWMutex
1822
dialer net.Dialer
23+
logger log.Logger
1924
}
2025

21-
func NewResolver(dialer net.Dialer, lookups ...Lookup) Resolver {
26+
func NewResolver(dialer net.Dialer, logger log.Logger, lookups ...Lookup) Resolver {
27+
if logger == nil {
28+
logger = logging.DefaultLogger()
29+
}
2230
r := &resolver{
2331
resolvers: make(map[Lookup]bool),
2432
dialer: dialer,
33+
logger: log.WithPrefix(logger, "component", "xresolver"),
2534
}
2635

2736
for _, lookup := range lookups {
@@ -70,7 +79,7 @@ func (resolve *resolver) getRoutes(ctx context.Context, host string) []Route {
7079
return routes
7180
}
7281

73-
func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (con net.Conn, err error) {
82+
func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
7483
host, port, err := net.SplitHostPort(addr)
7584
if err != nil {
7685
return nil, err
@@ -84,25 +93,27 @@ func (resolve *resolver) DialContext(ctx context.Context, network, addr string)
8493
routes := resolve.getRoutes(ctx, host)
8594

8695
// generate Conn or err from records
87-
con, err = resolve.createConnection(routes, network, port)
96+
con, route, err := resolve.createConnection(routes, network, port)
8897
if err == nil {
89-
return
98+
log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "successfully created connection using xresolver", "new-route", route.String(), "addr", addr)
99+
return con, err
90100
}
91101

102+
log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "failed to create connection with other routes using original address", "addr", addr)
92103
// if no connection, create using the default dialer
93104
return resolve.dialer.DialContext(ctx, network, addr)
94105
}
95106

96-
func (resolve *resolver) createConnection(routes []Route, network, port string) (con net.Conn, err error) {
107+
func (resolve *resolver) createConnection(routes []Route, network, port string) (net.Conn, Route, error) {
97108
for _, route := range routes {
98109
portUsed := port
99110
if route.Port != 0 {
100111
portUsed = strconv.Itoa(route.Port)
101112
}
102-
con, err = resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed))
113+
con, err := resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed))
103114
if err == nil {
104-
return
115+
return con, route, err
105116
}
106117
}
107-
return nil, errors.New("failed to create connection from routes")
118+
return nil, Route{}, errors.New("failed to create connection from routes")
108119
}

xresolver/xresolver_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"github.com/stretchr/testify/assert"
77
"github.com/stretchr/testify/mock"
8+
"github.com/xmidt-org/webpa-common/logging"
89
"io/ioutil"
910
"net/http"
1011
"net/http/httptest"
@@ -21,7 +22,7 @@ func TestClient(t *testing.T) {
2122

2223
client := &http.Client{
2324
Transport: &http.Transport{
24-
DialContext: NewResolver(DefaultDialer).DialContext,
25+
DialContext: NewResolver(DefaultDialer, logging.NewTestLogger(nil, t)).DialContext,
2526
},
2627
}
2728

@@ -62,7 +63,7 @@ func TestClientWithResolver(t *testing.T) {
6263

6364
fakeLookUp := new(mockLookUp)
6465
fakeLookUp.On("LookupRoutes", mock.Anything, customhost).Return([]Route{route}, nil)
65-
r := NewResolver(DefaultDialer, fakeLookUp)
66+
r := NewResolver(DefaultDialer, logging.NewTestLogger(nil, t), fakeLookUp)
6667

6768
client := &http.Client{
6869
Transport: &http.Transport{

0 commit comments

Comments
 (0)