Skip to content
This repository was archived by the owner on Jan 29, 2025. It is now read-only.

Commit 48c2d0f

Browse files
togashidmkillianmuldoon
authored andcommitted
Implement structured logging
Code updated with klogs and introduce the corresponding flag for the klog level in TAS deployment.
1 parent db967a0 commit 48c2d0f

File tree

20 files changed

+157
-103
lines changed

20 files changed

+157
-103
lines changed

extender/scheduler.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@ import (
55
"crypto/tls"
66
"crypto/x509"
77
"io/ioutil"
8-
"log"
98
"net/http"
109
"time"
10+
11+
"k8s.io/klog/v2"
1112
)
1213

1314
//postOnly check if the method type is POST
1415
func postOnly(next http.HandlerFunc) http.HandlerFunc {
1516
return func(w http.ResponseWriter, r *http.Request) {
1617
if r.Method != http.MethodPost {
1718
w.WriteHeader(http.StatusMethodNotAllowed)
18-
log.Print("method Type not POST")
19+
klog.V(2).InfoS("method Type not POST", "component", "extender")
1920
return
2021
}
2122
next.ServeHTTP(w, r)
@@ -27,7 +28,7 @@ func contentLength(next http.HandlerFunc) http.HandlerFunc {
2728
return func(w http.ResponseWriter, r *http.Request) {
2829
if r.ContentLength > 1*1000*1000*1000 {
2930
w.WriteHeader(http.StatusInternalServerError)
30-
log.Print("request size too large")
31+
klog.V(2).InfoS("request size too large", "component", "extender")
3132
return
3233
}
3334
next.ServeHTTP(w, r)
@@ -40,7 +41,7 @@ func requestContentType(next http.HandlerFunc) http.HandlerFunc {
4041
requestContentType := r.Header.Get("Content-Type")
4142
if requestContentType != "application/json" {
4243
w.WriteHeader(http.StatusNotFound)
43-
log.Print("request content type not application/json")
44+
klog.V(2).InfoS("request content type not application/json", "component", "extender")
4445
return
4546
}
4647
next.ServeHTTP(w, r)
@@ -72,7 +73,7 @@ func handlerWithMiddleware(handle http.HandlerFunc) http.HandlerFunc {
7273

7374
//error handler deals with requests sent to an invalid endpoint and returns a 404.
7475
func errorHandler(w http.ResponseWriter, r *http.Request) {
75-
log.Print("unknown path")
76+
klog.V(2).InfoS("Requested resource: '"+r.URL.Path+"' not found", "component", "extender")
7677
w.Header().Add("Content-Type", "application/json")
7778
w.WriteHeader(http.StatusNotFound)
7879
}
@@ -86,22 +87,23 @@ func (m Server) StartServer(port string, certFile string, keyFile string, caFile
8687
mx.HandleFunc("/scheduler/filter", handlerWithMiddleware(m.Filter))
8788
var err error
8889
if unsafe {
89-
log.Printf("Extender Listening on HTTP %v", port)
90+
klog.V(2).InfoS("Extender Listening on HTTP "+port, "component", "extender")
9091
err = http.ListenAndServe(":"+port, mx)
9192
} else {
92-
log.Printf("Extender Now Listening on HTTPS %v", port)
9393
srv := configureSecureServer(port, caFile)
9494
srv.Handler = mx
95-
log.Fatal(srv.ListenAndServeTLS(certFile, keyFile))
95+
klog.V(2).InfoS("Extender Listening on HTTPS "+port, "component", "extender")
96+
97+
klog.Fatal(srv.ListenAndServeTLS(certFile, keyFile))
9698
}
97-
log.Printf("Scheduler extender failed %v ", err)
99+
klog.V(2).InfoS("Scheduler extender server failed to start "+err.Error(), "component", "extender")
98100
}
99101

100102
//Configuration values including algorithms etc for the TAS scheduling endpoint.
101103
func configureSecureServer(port string, caFile string) *http.Server {
102104
caCert, err := ioutil.ReadFile(caFile)
103105
if err != nil {
104-
log.Fatal(err)
106+
klog.V(2).InfoS("caCert read failed: "+err.Error(), "component", "extender")
105107
}
106108
caCertPool := x509.NewCertPool()
107109
caCertPool.AppendCertsFromPEM(caCert)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ require (
66
k8s.io/api v0.20.2
77
k8s.io/apimachinery v0.20.2
88
k8s.io/client-go v0.20.2
9+
k8s.io/klog/v2 v2.4.0
910
k8s.io/metrics v0.20.2
1011
)

telemetry-aware-scheduling/cmd/main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"flag"
5-
"log"
65
"os"
76

87
"os/signal"
@@ -22,6 +21,7 @@ import (
2221
"k8s.io/client-go/kubernetes"
2322
"k8s.io/client-go/rest"
2423
"k8s.io/client-go/tools/clientcmd"
24+
"k8s.io/klog/v2"
2525

2626
"context"
2727

@@ -31,6 +31,7 @@ import (
3131
func main() {
3232
var kubeConfig, port, certFile, keyFile, caFile, syncPeriod string
3333
var unsafe bool
34+
klog.InitFlags(nil)
3435
flag.StringVar(&kubeConfig, "kubeConfig", "/root/.kube/config", "location of kubernetes config file")
3536
flag.StringVar(&port, "port", "9001", "port on which the scheduler extender will listen")
3637
flag.StringVar(&certFile, "cert", "/etc/kubernetes/pki/ca.crt", "cert file extender will use for authentication")
@@ -44,22 +45,26 @@ func main() {
4445
sch := extender.Server{Scheduler: tscheduler}
4546
go sch.StartServer(port, certFile, keyFile, caFile, unsafe)
4647
tasController(kubeConfig, syncPeriod, cache)
48+
klog.Flush()
4749
}
4850

4951
//tasController The controller load the TAS policy/strategies and places them into a local cache that is available
5052
//to all TAS components. It also monitors the current state of policies.
5153
func tasController(kubeConfig string, syncPeriod string, cache *tascache.AutoUpdatingCache) {
5254
kubeClient, clientConfig, err := getkubeClient(kubeConfig)
5355
if err != nil {
56+
klog.V(2).InfoS("Issue in getting client config: "+err.Error(), "component", "controller")
5457
panic(err)
5558
}
5659
syncDuration, err := time.ParseDuration(syncPeriod)
5760
if err != nil {
61+
klog.V(2).InfoS("Sync problems in Parsing: "+err.Error(), "component", "controller")
5862
panic(err)
5963
}
6064
metricsClient := metrics.NewClient(clientConfig)
6165
telpolicyClient, _, err := telemetrypolicyclient.NewRest(*clientConfig)
6266
if err != nil {
67+
klog.V(2).InfoS("Rest client access to telemetrypolicy CRD problem: "+err.Error(), "component", "controller")
6368
panic(err)
6469
}
6570
metricTicker := time.NewTicker(syncDuration)
@@ -86,7 +91,7 @@ func tasController(kubeConfig string, syncPeriod string, cache *tascache.AutoUpd
8691
func getkubeClient(kubeConfig string) (kubernetes.Interface, *rest.Config, error) {
8792
clientConfig, err := rest.InClusterConfig()
8893
if err != nil {
89-
log.Print("not in cluster - trying file-based configuration")
94+
klog.V(4).InfoS("not in cluster - trying file-based configuration", "component", "controller")
9095
clientConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
9196
if err != nil {
9297
return nil, nil, err
@@ -102,5 +107,5 @@ func getkubeClient(kubeConfig string) (kubernetes.Interface, *rest.Config, error
102107
func catchInterrupt(done chan os.Signal) {
103108
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
104109
<-done
105-
log.Println("\nPolicy controller closed ")
110+
klog.V(2).InfoS("Policy controller closed ", "component", "controller")
106111
}

telemetry-aware-scheduling/deploy/tas-deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ spec:
2424
- --cert=/tas/cert/tls.crt
2525
- --key=/tas/cert/tls.key
2626
- --cacert=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
27+
- --v=2
2728
image: tasextender
2829
imagePullPolicy: IfNotPresent
2930
securityContext:

telemetry-aware-scheduling/pkg/cache/autoupdating.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package cache
33
import (
44
"errors"
55
"fmt"
6-
"log"
76
"sync"
87
"time"
98

109
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/metrics"
1110
telemetrypolicy "github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/telemetrypolicy/api/v1alpha1"
11+
12+
"k8s.io/klog/v2"
1213
)
1314

1415
const (
@@ -50,7 +51,7 @@ func (n *AutoUpdatingCache) updateAllMetrics(client metrics.Client) {
5051
if len(name) > 0 {
5152
err := n.updateMetric(client, name)
5253
if err != nil {
53-
log.Print(err)
54+
klog.V(2).InfoS(err.Error(), "component", "controller")
5455
}
5556
} else {
5657
delete(n.metricMap, name)
@@ -117,7 +118,7 @@ func (n *AutoUpdatingCache) WriteMetric(metricName string, data metrics.NodeMetr
117118

118119
//DeletePolicy removes the policy removes the policy object at the given namespace/name string from the cache
119120
func (n *AutoUpdatingCache) DeletePolicy(namespace string, policyName string) error {
120-
log.Print("deleting", fmt.Sprintf(policyPath, namespace, policyName))
121+
klog.V(2).InfoS("deleting "+fmt.Sprintf(policyPath, namespace, policyName), "component", "controller")
121122
n.delete(fmt.Sprintf(policyPath, namespace, policyName))
122123
return nil
123124
}

telemetry-aware-scheduling/pkg/cache/autoupdating_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package cache
22

33
import (
4-
"log"
4+
"fmt"
55
"reflect"
66
"testing"
77
"time"
88

99
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/metrics"
1010
telemetrypolicy "github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/telemetrypolicy/api/v1alpha1"
11+
12+
"k8s.io/klog/v2"
1113
)
1214

1315
func TestNodeMetricsCache_PeriodicUpdate(t *testing.T) {
@@ -55,7 +57,8 @@ func TestNodeMetricsCache_PeriodicUpdate(t *testing.T) {
5557
t.Error(err)
5658
}
5759
if atStart[tt.queriedNode].Value == atEnd[tt.queriedNode].Value {
58-
log.Print(atStart[tt.queriedNode].Value, atEnd[tt.queriedNode].Value)
60+
msg := fmt.Sprint(atStart[tt.queriedNode].Value, atEnd[tt.queriedNode].Value)
61+
klog.InfoS(msg, "component", "testing")
5962
t.Fail()
6063
}
6164
})

telemetry-aware-scheduling/pkg/controller/controller.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package controller
55
import (
66
"context"
77
"errors"
8-
"log"
8+
"fmt"
99

1010
strategy "github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/strategies/core"
1111
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/strategies/deschedule"
@@ -15,13 +15,15 @@ import (
1515
core "k8s.io/api/core/v1"
1616
"k8s.io/apimachinery/pkg/fields"
1717
"k8s.io/client-go/tools/cache"
18+
"k8s.io/klog/v2"
1819
)
1920

2021
//Run starts the controller watching on the Informer queue and doesnt' stop it until the Done signal is received from context
2122
func (controller *TelemetryPolicyController) Run(context context.Context) {
22-
log.Print("Watching Telemetry Policies ")
23+
klog.V(2).InfoS("Watching Telemetry Policies", "component", "controller")
2324
_, err := controller.watch(context)
2425
if err != nil {
26+
klog.V(2).InfoS(err.Error(), "component", "controller")
2527
panic(err)
2628
}
2729
<-context.Done()
@@ -54,20 +56,20 @@ func (controller *TelemetryPolicyController) watch(context context.Context) (cac
5456
func (controller *TelemetryPolicyController) onAdd(obj interface{}) {
5557
pol, ok := obj.(*telemetrypolicy.TASPolicy)
5658
if !ok {
57-
log.Print("cannot add policy: not recognized as a telemetry policy")
59+
klog.V(4).InfoS("cannot add policy: not recognized as a telemetry policy", "component", "controller")
5860
return
5961
}
6062
polCopy := pol.DeepCopy()
6163
err := controller.WritePolicy(polCopy.Namespace, polCopy.Name, *polCopy)
6264
if err != nil {
63-
log.Print("policy not added to cache: ", err)
65+
klog.V(2).InfoS("Policy not added to cache: "+err.Error(), "component", "controller")
6466
return
6567
}
6668
for _, name := range controller.Enforcer.RegisteredStrategyTypes() {
67-
log.Printf("registering %v from %v", name, pol.Name)
69+
klog.V(4).InfoS("registering "+name+" from "+pol.Name, "component", "controller")
6870
strt, err := castStrategy(name, polCopy.Spec.Strategies[name])
6971
if err != nil {
70-
log.Print(err)
72+
klog.V(2).InfoS(err.Error(), "component", "controller")
7173
return
7274
}
7375
strt.SetPolicyName(polCopy.ObjectMeta.Name)
@@ -76,11 +78,11 @@ func (controller *TelemetryPolicyController) onAdd(obj interface{}) {
7678
for _, rule := range ruleset[name].Rules {
7779
err := controller.WriteMetric(rule.Metricname, nil)
7880
if err == nil {
79-
log.Print("Added " + rule.Metricname)
81+
klog.V(2).InfoS("Added "+rule.Metricname, "component", "controller")
8082
}
8183
}
8284
}
83-
log.Println("Added policy,", polCopy.Name)
85+
klog.V(2).InfoS("Added policy, "+polCopy.Name, "component", "controller")
8486
}
8587

8688
//castStrategy takes in a TASpolicy and returns its specific type based on the structure of the policy file.
@@ -107,34 +109,35 @@ func (controller *TelemetryPolicyController) onUpdate(old, new interface{}) {
107109
polCopy := newPol.DeepCopy()
108110
err := controller.WritePolicy(polCopy.Namespace, polCopy.Name, *polCopy)
109111
if err != nil {
110-
log.Print("cached policy not updated: ", err)
112+
msg := fmt.Sprintf("cached policy not updated %v", err)
113+
klog.V(2).InfoS(msg, "component", "controller")
111114
return
112115
}
113-
log.Println("Policy: ", polCopy.Name, " updated")
116+
klog.V(2).InfoS("Policy: "+polCopy.Name+" updated", "component", "controller")
114117
for _, name := range controller.Enforcer.RegisteredStrategyTypes() {
115118
oldStrat, err := castStrategy(name, oldPol.Spec.Strategies[name])
116119
if err != nil {
117-
log.Print(err)
120+
klog.V(2).InfoS(err.Error(), "component", "controller")
118121
return
119122
}
120123
controller.Enforcer.RemoveStrategy(oldStrat, oldStrat.StrategyType())
121124
for _, rule := range oldPol.Spec.Strategies[oldStrat.StrategyType()].Rules {
122125
err := controller.DeleteMetric(rule.Metricname)
123126
if err != nil {
124-
log.Print(err)
127+
klog.V(2).InfoS(err.Error(), "component", "controller")
125128
}
126129
}
127130
strt, err := castStrategy(name, polCopy.Spec.Strategies[name])
128131
if err != nil {
129-
log.Print(err)
132+
klog.V(2).InfoS(err.Error(), "component", "controller")
130133
return
131134
}
132135
strt.SetPolicyName(polCopy.ObjectMeta.Name)
133136
controller.Enforcer.AddStrategy(strt, name)
134137
for _, rule := range polCopy.Spec.Strategies[name].Rules {
135138
err := controller.WriteMetric(rule.Metricname, nil)
136139
if err != nil {
137-
log.Print(err)
140+
klog.V(2).InfoS(err.Error(), "component", "controller")
138141
}
139142
}
140143
}
@@ -147,22 +150,22 @@ func (controller *TelemetryPolicyController) onDelete(obj interface{}) {
147150
for _, name := range controller.Enforcer.RegisteredStrategyTypes() {
148151
strt, err := castStrategy(name, polCopy.Spec.Strategies[name])
149152
if err != nil {
150-
log.Print(err)
153+
klog.V(2).InfoS(err.Error(), "component", "controller")
151154
return
152155
}
153156
strt.SetPolicyName(pol.Name)
154157
controller.Enforcer.RemoveStrategy(strt, strt.StrategyType())
155158
for _, rule := range polCopy.Spec.Strategies[strt.StrategyType()].Rules {
156159
err := controller.DeleteMetric(rule.Metricname)
157160
if err != nil {
158-
log.Print(err)
161+
klog.V(2).InfoS(err.Error(), "component", "controller")
159162
}
160163
}
161164
}
162165
err := controller.DeletePolicy(polCopy.Namespace, polCopy.Name)
163166
if err != nil {
164-
log.Print(err)
167+
klog.V(4).InfoS(err.Error(), "component", "controller")
165168
return
166169
}
167-
log.Println("Policy: ", polCopy.Name, " deleted")
170+
klog.V(2).InfoS("Policy: "+polCopy.Name+" deleted", "component", "controller")
168171
}

telemetry-aware-scheduling/pkg/metrics/client_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"bytes"
55
"fmt"
66
"io/ioutil"
7-
"log"
87
"reflect"
98
"testing"
109
"time"
1110

11+
"k8s.io/klog/v2"
12+
1213
"k8s.io/client-go/tools/clientcmd"
1314

1415
v1 "k8s.io/api/core/v1"
@@ -30,7 +31,8 @@ var baseTimeStamp = time.Date(2019, time.May, 20, 12, 25, 00, 0, time.UTC)
3031
func dummyRestClientConfig() *restclient.Config {
3132
tmpFile, err := ioutil.TempFile("", "cmdtests_temp")
3233
if err != nil {
33-
panic(fmt.Sprintf("unable to create a fake client config: %v", err))
34+
klog.InfoS("Unable to create a fake client config: "+err.Error(), "component", "testing")
35+
panic(err)
3436
}
3537
loadingRules := &clientcmd.ClientConfigLoadingRules{
3638
Precedence: []string{tmpFile.Name()},
@@ -41,7 +43,7 @@ func dummyRestClientConfig() *restclient.Config {
4143
clientConfig := clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, overrides, fallbackReader)
4244
restConfig, err := clientConfig.ClientConfig()
4345
if err != nil {
44-
log.Fatalf("Can't create dummy rest client config %v ", err)
46+
klog.InfoS("Can't create dummy rest client config: "+err.Error(), "component", "testing")
4547
}
4648
return restConfig
4749
}
@@ -135,7 +137,7 @@ func TestNewClient(t *testing.T) {
135137
t.Run(tt.name, func(t *testing.T) {
136138
got := NewClient(tt.args.config)
137139
if reflect.TypeOf(got) != reflect.TypeOf(dummyRestClientConfig()) {
138-
log.Print("No real test implemented here")
140+
klog.InfoS("No real test implemented here", "component", "testing")
139141
//TODO:add some better verification constructor has worked here.
140142
}
141143
})

0 commit comments

Comments
 (0)