Skip to content

Commit fa39730

Browse files
authored
x/ref/lib/aws/vxray: add support for obtaining eks/k8s data (#206)
This PR adds support for obtaining the eke/kubernetes(k8s) cluster name, container id etc. Note, this is PR is not specific to EKS and can be used for other k8s systems. It relies on the convention that the k8s cluster is configured to write a configmap with the cluster's name as one of the key/value pairs. The vxray setup allows for both the name of the configmap and the key for the cluster name to be configured.
1 parent 99e7c58 commit fa39730

File tree

8 files changed

+307
-40
lines changed

8 files changed

+307
-40
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY echo /bin/echo
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/echo"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY echod /bin/echod
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/echod"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY mounttabled /bin/mounttabled
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/mounttabled"]

x/ref/examples/echo/echo/echo.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package main
77

88
import (
9+
"errors"
910
"flag"
1011
"fmt"
1112
"io"
@@ -18,6 +19,7 @@ import (
1819
"github.com/aws/aws-xray-sdk-go/xray"
1920
v23 "v.io/v23"
2021
"v.io/v23/context"
22+
"v.io/v23/naming"
2123
"v.io/v23/vtrace"
2224
"v.io/x/ref/examples/echo"
2325
"v.io/x/ref/lib/aws/vxray"
@@ -50,7 +52,20 @@ func main() {
5052
ctx, shutdown := v23.Init()
5153
defer shutdown()
5254

53-
ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
55+
ctx, _ = vxray.InitXRay(ctx,
56+
v23.GetRuntimeFlags().VtraceFlags,
57+
xray.Config{ServiceVersion: ""},
58+
vxray.EC2Plugin(),
59+
vxray.EKSCluster(),
60+
vxray.ContainerIDAndHost(),
61+
vxray.MergeLogging(true))
62+
63+
servers := strings.Split(serverFlag, ",")
64+
if len(servers) > 0 {
65+
ctx.Infof("waiting for: %v servers: %v", len(servers), servers)
66+
waitForServers(ctx, servers)
67+
ctx.Infof("servers ready: %v", servers)
68+
}
5469

5570
client := echo.EchoServiceClient(nameFlag)
5671

@@ -59,7 +74,7 @@ func main() {
5974
if len(httpAddr) > 0 {
6075
wg.Add(1)
6176
go func() {
62-
runHttpServer(ctx, httpAddr, client)
77+
runHTTPServer(ctx, httpAddr, client)
6378
wg.Done()
6479
}()
6580
}
@@ -73,7 +88,6 @@ func main() {
7388
close(done)
7489
}()
7590

76-
servers := strings.Split(serverFlag, ",")
7791
samplingRequest := &vtrace.SamplingRequest{
7892
Name: nameFlag,
7993
}
@@ -106,7 +120,7 @@ func main() {
106120
}()
107121
select {
108122
case <-done:
109-
time.Sleep(1)
123+
time.Sleep(time.Second * 2)
110124
case <-signals.ShutdownOnSignals(ctx):
111125
}
112126
}
@@ -115,7 +129,7 @@ func main() {
115129
// <addr>/call and <addr>/call?forward-to=<server>
116130
// will issue RPCs to the echo server.
117131
// <addr>/quit will cause the client to exit gracefully.
118-
func runHttpServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) {
132+
func runHTTPServer(ctx *context.T, addr string, client echo.EchoServiceClientStub) {
119133
xrayHandler := xray.Handler(
120134
xray.NewFixedSegmentNamer("http.echo.client"),
121135
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -169,7 +183,7 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
169183
}
170184
result, err := client.Ping(ctx, now, servers)
171185
if err != nil {
172-
ctx.Errorf("%v.%v failed: %v", nameFlag, "ping", err)
186+
ctx.Errorf("%v.%v failed: %v", servers, "ping", err)
173187
}
174188
if len(result) < 100 {
175189
fmt.Fprintln(out, result)
@@ -178,3 +192,27 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
178192
}
179193
return err
180194
}
195+
196+
func waitForServers(ctx *context.T, servers []string) {
197+
var wg sync.WaitGroup
198+
wg.Add(len(servers))
199+
ns := v23.GetNamespace(ctx)
200+
for _, server := range servers {
201+
go func(server string) {
202+
for {
203+
_, err := ns.Resolve(ctx, server)
204+
ctx.Infof("%v: %v: %v", server, err, errors.Is(err, naming.ErrNoSuchName))
205+
if errors.Is(err, naming.ErrNoSuchName) {
206+
time.Sleep(time.Second)
207+
continue
208+
}
209+
if err == nil {
210+
break
211+
}
212+
ctx.Infof("%v: %v\n", server, err)
213+
}
214+
wg.Done()
215+
}(server)
216+
}
217+
wg.Wait()
218+
}

x/ref/examples/echo/echod/echod.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ func main() {
7676
ctx, shutdown := v23.Init()
7777
defer shutdown()
7878

79-
ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
79+
ctx, _ = vxray.InitXRay(ctx,
80+
v23.GetRuntimeFlags().VtraceFlags,
81+
xray.Config{ServiceVersion: ""},
82+
vxray.EC2Plugin(),
83+
vxray.EKSCluster(),
84+
vxray.ContainerIDAndHost(),
85+
vxray.MergeLogging(true))
8086

8187
ctx, server, err := v23.WithNewServer(ctx, nameFlag, echo.EchoServiceServer(&echod{}), securityflag.NewAuthorizerOrDie(ctx))
8288
if err != nil {

x/ref/lib/aws/vxray/config.go

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package vxray
99

1010
import (
1111
"fmt"
12+
"os"
1213

1314
"github.com/aws/aws-xray-sdk-go/awsplugins/beanstalk"
1415
"github.com/aws/aws-xray-sdk-go/awsplugins/ec2"
@@ -18,15 +19,18 @@ import (
1819
"v.io/v23/context"
1920
"v.io/v23/logging"
2021
"v.io/v23/vtrace"
22+
"v.io/x/ref/lib/aws/vxray/internal"
2123
"v.io/x/ref/lib/flags"
2224
libvtrace "v.io/x/ref/lib/vtrace"
2325
)
2426

2527
type options struct {
26-
mergeLogging bool
27-
mapToHTTP bool
28-
newStore bool
29-
newStoreFlags flags.VtraceFlags
28+
mergeLogging bool
29+
mapToHTTP bool
30+
newStore bool
31+
newStoreFlags flags.VtraceFlags
32+
configMap, configMapKey string
33+
containerized bool
3034
}
3135

3236
// Option represents an option to InitXRay.
@@ -53,6 +57,44 @@ func BeanstalkPlugin() Option {
5357
}
5458
}
5559

60+
// KubernetesCluster configures obtaining information about the process'
61+
// current environment when running under Kubernetes (k8s), whether managed by
62+
// AWS EKS or any other control plane implementation. It requires that the
63+
// K8S configuration creates a configmap that contains the cluster name.
64+
// The configMap argument names that configmap and configMapKey
65+
// is the key in that configmap for the cluster name. For example, when using
66+
// the AWS cloudwatch/insights/xray-daemon daemonset the values for those
67+
// would be:
68+
// /api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info
69+
// cluster.name
70+
//
71+
// When configured, xray segments will contain a 'cluster_name' annotation.
72+
func KubernetesCluster(configMap, configMapKey string) Option {
73+
return func(o *options) {
74+
o.configMap, o.configMapKey = configMap, configMapKey
75+
}
76+
}
77+
78+
// EKSCluster calls KubernetesCluster with the values commonly used
79+
// with EKS clusters.
80+
func EKSCluster() Option {
81+
return KubernetesCluster("/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info", "cluster.name")
82+
}
83+
84+
// ContainerIDAndHost requests that container id and host information be
85+
// obtained and added to traces. The container id is obtained by parsing
86+
// the /proc/self/cgroup file, and the host by call the operating system's
87+
// hostname function. When running under kubernetes for example, a pod's
88+
// name is configured as its hostname.
89+
//
90+
// When configured, xray segments will contain 'container_id' and 'container_host'
91+
// annotations.
92+
func ContainerIDAndHost() Option {
93+
return func(o *options) {
94+
o.containerized = true
95+
}
96+
}
97+
5698
// MergeLogging arrays for xray logging messages to be merged with vanadium
5799
// log messages.
58100
func MergeLogging(v bool) Option {
@@ -105,44 +147,73 @@ func (xl *xraylogger) Log(level xraylog.LogLevel, msg fmt.Stringer) {
105147
}
106148
}
107149

108-
func initXRay(ctx *context.T, config xray.Config, opts []Option) (*context.T, *options, error) {
109-
o := &options{mapToHTTP: true}
110-
for _, fn := range opts {
111-
fn(o)
112-
}
150+
func (m *manager) initXRay(ctx *context.T, config xray.Config) (*context.T, error) {
113151
if err := xray.Configure(config); err != nil {
114152
ctx.Errorf("failed to configure xray context: %v", err)
115-
return ctx, nil, err
153+
return ctx, err
116154
}
117-
if o.mergeLogging {
155+
if m.options.mergeLogging {
118156
xray.SetLogger(&xraylogger{context.LoggerFromContext(ctx)})
119157
}
120158
ctx, err := WithConfig(ctx, config)
121-
return ctx, o, err
159+
return ctx, err
122160
}
123161

124162
// InitXRay configures the AWS xray service and returns a context containing
125163
// the xray configuration. This should only be called once. The vflags argument
126164
// is used solely to check if xray tracing is enabled and not to create a
127-
// new vtrace.Store, if a new store is required, the
165+
// new vtrace.Store, if a new/alternate store is required, the WithNewStore option
166+
// should be used to specify the store to be used.
128167
func InitXRay(ctx *context.T, vflags flags.VtraceFlags, config xray.Config, opts ...Option) (*context.T, error) {
129168
if !vflags.EnableAWSXRay {
130169
return ctx, nil
131170
}
132171
octx := ctx
133-
ctx, options, err := initXRay(ctx, config, opts)
172+
mgr := &manager{}
173+
mgr.options.mapToHTTP = true
174+
for _, fn := range opts {
175+
fn(&mgr.options)
176+
}
177+
ctx, err := mgr.initXRay(ctx, config)
134178
if err != nil {
135179
return octx, err
136180
}
137-
138-
if options.newStore {
139-
store, err := libvtrace.NewStore(options.newStoreFlags)
181+
if mgr.options.newStore {
182+
store, err := libvtrace.NewStore(mgr.options.newStoreFlags)
140183
if err != nil {
141184
return octx, err
142185
}
143186
ctx = vtrace.WithStore(ctx, store)
144187
}
145-
mgr := &manager{mapToHTTP: options.mapToHTTP}
188+
if mgr.options.containerized {
189+
if hostNameErr == nil {
190+
mgr.containerHost = hostName
191+
} else {
192+
ctx.Infof("failed to obtain host name from: %v", hostNameErr)
193+
}
194+
cgroupFile := "/proc/self/cgroup"
195+
if cid, err := internal.GetContainerID(cgroupFile); err == nil {
196+
mgr.containerID = cid
197+
} else {
198+
ctx.Infof("failed to obtain container id", err)
199+
}
200+
}
201+
if cm := mgr.options.configMap; len(cm) > 0 {
202+
if clusterName, err := internal.GetEKSClusterName(ctx, cm, mgr.options.configMapKey); err == nil {
203+
mgr.clusterName = clusterName
204+
} else {
205+
ctx.Infof("failed to obtain cluster name from %v.%v: %v", cm, mgr.options.configMapKey, err)
206+
}
207+
}
146208
ctx = vtrace.WithManager(ctx, mgr)
147209
return ctx, nil
148210
}
211+
212+
var (
213+
hostName string
214+
hostNameErr error
215+
)
216+
217+
func init() {
218+
hostName, hostNameErr = os.Hostname()
219+
}

0 commit comments

Comments
 (0)