Skip to content
This repository was archived by the owner on Jan 29, 2025. It is now read-only.

Commit db967a0

Browse files
killianmuldoontogashidm
authored andcommitted
Add e2e tests for Telemetry Aware Scheduling
End to end tests for the prioritize filter and deschedule strategies of TAS are used here as basic smoke tests to ensure the scheduler is operating in basic good order as intended. These tests are designed for basic regression on new changes, but the framework, based on the Kubernetes in Docker set up, can be used for more extensive and specific end to end testing.
1 parent feb5025 commit db967a0

File tree

2 files changed

+328
-0
lines changed

2 files changed

+328
-0
lines changed

.github/e2e/e2e_test.go

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
package e2e
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"flag"
7+
"fmt"
8+
"io"
9+
"log"
10+
"reflect"
11+
"testing"
12+
"time"
13+
14+
"github.com/pkg/errors"
15+
16+
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/metrics"
17+
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/strategies/deschedule"
18+
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/strategies/dontschedule"
19+
"github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/strategies/scheduleonmetric"
20+
api "github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/telemetrypolicy/api/v1alpha1"
21+
tasclient "github.com/intel/telemetry-aware-scheduling/telemetry-aware-scheduling/pkg/telemetrypolicy/client/v1alpha1"
22+
v1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/api/resource"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/labels"
26+
"k8s.io/apimachinery/pkg/util/rand"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/tools/clientcmd"
29+
"k8s.io/client-go/util/homedir"
30+
31+
"path/filepath"
32+
)
33+
34+
/*
35+
Metrics values are currently set with a mount file which is then read by the node exporter. This behaviour could be
36+
changed in future to allow the setting of metrics natively inside the e2e testing code. For this first iteration a new
37+
metric and policy will be used for each of the three e2e smoke tests being reviewed.
38+
39+
*/
40+
41+
var (
42+
kubeConfigPath *string
43+
cl *kubernetes.Clientset
44+
tascl *tasclient.Client
45+
cm metrics.CustomMetricsClient
46+
)
47+
48+
//init sets up the clients used for the end to end tests
49+
func init() {
50+
if home := homedir.HomeDir(); home != "" {
51+
kubeConfigPath = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "path to your kubeconfig file")
52+
} else {
53+
kubeConfigPath = flag.String("kubeconfig", "", "require absolute path to your kubeconfig file")
54+
}
55+
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfigPath)
56+
if err != nil {
57+
panic(err.Error())
58+
}
59+
60+
// create the clientset
61+
cl, err = kubernetes.NewForConfig(config)
62+
if err != nil {
63+
panic(err.Error())
64+
}
65+
cm = metrics.NewClient(config)
66+
67+
tascl, err = tasclient.New(*config, "default")
68+
if err != nil {
69+
panic(err.Error())
70+
}
71+
//TODO: Replace the generic timeout with an explicit check for the custom metrics from the API Server which times out after some period
72+
err = waitForMetrics(120 * time.Second)
73+
74+
if err != nil {
75+
panic(err.Error())
76+
}
77+
}
78+
79+
var (
80+
prioritize1Policy = getTASPolicy("prioritize1", scheduleonmetric.StrategyType, "prioritize1_metric", "GreaterThan", 0)
81+
filter1Policy = getTASPolicy("filter1", dontschedule.StrategyType, "filter1_metric", "LessThan", 20)
82+
filter2Policy = getTASPolicy("filter2", dontschedule.StrategyType, "filter2_metric", "Equals", 0)
83+
deschedule1Policy = getTASPolicy("deschedule1", deschedule.StrategyType, "deschedule1_metric", "GreaterThan", 8)
84+
)
85+
86+
// TestTASPrioritize will test the behaviour of a pod with a listed deschedule policy in TAS
87+
func TestTASDeschedule(t *testing.T) {
88+
tests := map[string]struct {
89+
policy *api.TASPolicy
90+
want map[string]bool
91+
}{
92+
"Label node for deschedule": {policy: deschedule1Policy, want: map[string]bool{"kind-worker2": true}},
93+
}
94+
for name, tc := range tests {
95+
t.Run(name, func(t *testing.T) {
96+
res := map[string]bool{}
97+
log.Printf("Running: %v\n", name)
98+
//defer the running of a cleanup function to remove the policy and pod after the test case
99+
defer cleanup("", tc.policy.Name)
100+
_, err := tascl.Create(tc.policy)
101+
if err != nil {
102+
log.Print(err)
103+
}
104+
time.Sleep(time.Second * 5)
105+
lbls := metav1.LabelSelector{MatchLabels: map[string]string{deschedule1Policy.Name: "violating"}}
106+
107+
nodes, err := cl.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(lbls.MatchLabels).String()})
108+
if err != nil {
109+
log.Print(err)
110+
}
111+
for _, n := range nodes.Items {
112+
res[n.Name] = true
113+
}
114+
if !reflect.DeepEqual(tc.want, res) {
115+
//Log full node specs and TAS Pod log if the test fails
116+
nodes, _ = cl.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
117+
log.Print(tasLog())
118+
for _, n := range nodes.Items {
119+
log.Printf("%v labels: %v", n.Name, n.ObjectMeta.Labels)
120+
}
121+
t.Errorf("expected: %v, got: %v", tc.want, res)
122+
}
123+
})
124+
}
125+
}
126+
127+
// TestTASFilter will test the behaviour of a pod with a listed filter/dontschedule policy in TAS
128+
func TestTASFilter(t *testing.T) {
129+
tests := map[string]struct {
130+
policy *api.TASPolicy
131+
pod *v1.Pod
132+
want string
133+
}{
134+
"Filter all but one node": {policy: filter1Policy, pod: podForPolicy(fmt.Sprintf("pod-%v", time.Now().Unix()), filter1Policy.Name), want: "kind-worker2"},
135+
"Filter all nodes": {policy: filter2Policy, pod: podForPolicy(fmt.Sprintf("pod-%v", rand.String(8)), filter2Policy.Name), want: ""},
136+
}
137+
for name, tc := range tests {
138+
t.Run(name, func(t *testing.T) {
139+
log.Printf("Running: %v\n", name)
140+
//defer the running of a cleanup function to remove the policy and pod after the test case
141+
defer cleanup(tc.pod.Name, tc.policy.Name)
142+
143+
_, err := tascl.Create(tc.policy)
144+
if err != nil {
145+
log.Print(err)
146+
}
147+
time.Sleep(time.Second * 5)
148+
_, err = cl.CoreV1().Pods("default").Create(context.TODO(), tc.pod, metav1.CreateOptions{})
149+
if err != nil {
150+
log.Print(err)
151+
}
152+
153+
time.Sleep(time.Second * 5)
154+
p, _ := cl.CoreV1().Pods("default").Get(context.TODO(), tc.pod.Name, metav1.GetOptions{})
155+
log.Print(p.Name)
156+
if !reflect.DeepEqual(tc.want, p.Spec.NodeName) {
157+
t.Errorf("expected: %v, got: %v", tc.want, p.Spec.NodeName)
158+
}
159+
})
160+
}
161+
162+
}
163+
164+
// TestTASPrioritize will test the behaviour of a pod with a listed prioritize/scheduleonmetric policy in TAS
165+
func TestTASPrioritize(t *testing.T) {
166+
tests := map[string]struct {
167+
policy *api.TASPolicy
168+
pod *v1.Pod
169+
want string
170+
}{
171+
"Prioritize to highest score node": {policy: prioritize1Policy, pod: podForPolicy(fmt.Sprintf("pod-%v", time.Now().Unix()), prioritize1Policy.Name), want: "kind-worker2"},
172+
}
173+
for name, tc := range tests {
174+
t.Run(name, func(t *testing.T) {
175+
log.Printf("Running: %v\n", name)
176+
//defer the running of a cleanup function to remove the policy and pod after the test case
177+
defer cleanup(tc.pod.Name, tc.policy.Name)
178+
179+
_, err := tascl.Create(tc.policy)
180+
if err != nil {
181+
log.Print(err)
182+
}
183+
time.Sleep(time.Second * 5)
184+
_, err = cl.CoreV1().Pods("default").Create(context.TODO(), tc.pod, metav1.CreateOptions{})
185+
if err != nil {
186+
log.Print(err)
187+
}
188+
time.Sleep(time.Second * 5)
189+
p, _ := cl.CoreV1().Pods("default").Get(context.TODO(), tc.pod.Name, metav1.GetOptions{})
190+
log.Print(p.Name)
191+
192+
if !reflect.DeepEqual(tc.want, p.Spec.NodeName) {
193+
t.Errorf("expected: %v, got: %v", tc.want, p.Spec.NodeName)
194+
}
195+
})
196+
}
197+
198+
}
199+
200+
func podForPolicy(podName, policyName string) *v1.Pod {
201+
return &v1.Pod{
202+
ObjectMeta: metav1.ObjectMeta{
203+
Name: podName,
204+
Namespace: "default",
205+
Labels: map[string]string{"telemetry-policy": policyName},
206+
},
207+
Spec: v1.PodSpec{
208+
Containers: []v1.Container{
209+
{
210+
Name: "test",
211+
Image: "busybox",
212+
Command: []string{"/bin/sh", "-c", "sleep INF"},
213+
Resources: v1.ResourceRequirements{
214+
Limits: v1.ResourceList{"telemetry/scheduling": *resource.NewQuantity(1, resource.DecimalSI)},
215+
},
216+
},
217+
},
218+
},
219+
}
220+
}
221+
222+
func cleanup(podName string, policyName string) {
223+
if podName != "" {
224+
err := cl.CoreV1().Pods("default").Delete(context.TODO(), podName, metav1.DeleteOptions{})
225+
if err != nil {
226+
log.Print(err.Error())
227+
}
228+
}
229+
err := tascl.Delete(policyName, &metav1.DeleteOptions{})
230+
if err != nil {
231+
log.Print(err.Error())
232+
}
233+
}
234+
235+
func waitForMetrics(timeout time.Duration) error {
236+
t := time.Now().Add(timeout)
237+
var failureMessage error
238+
for time.Now().Before(t) {
239+
m, err := cm.GetNodeMetric("filter1_metric")
240+
if len(m) > 0 {
241+
log.Printf("Metrics returned after %v: %v", t.Sub(time.Now()), m)
242+
return nil
243+
}
244+
time.Sleep(2)
245+
failureMessage = err
246+
}
247+
return errors.Wrap(failureMessage, "Request for custom metrics has timed out.")
248+
}
249+
250+
// tasLog returns the log of the Telemetry Aware Scheduling pod as a string
251+
func tasLog() string {
252+
lbls := metav1.LabelSelector{MatchLabels: map[string]string{"app": "tas"}}
253+
254+
pods, err := cl.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(lbls.MatchLabels).String()})
255+
if err != nil {
256+
return "error in getting config"
257+
}
258+
if len(pods.Items) <= 0 {
259+
return "Tas logs not found in API to not be running"
260+
}
261+
pod := pods.Items[0]
262+
podLogOpts := v1.PodLogOptions{}
263+
req := cl.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
264+
podLogs, err := req.Stream(context.TODO())
265+
if err != nil {
266+
return "error in opening stream"
267+
}
268+
defer podLogs.Close()
269+
270+
buf := new(bytes.Buffer)
271+
_, err = io.Copy(buf, podLogs)
272+
if err != nil {
273+
return "error in copy information from podLogs to buf"
274+
}
275+
str := buf.String()
276+
277+
return str
278+
279+
}
280+
281+
func getTASPolicy(name string, str string, metric string, operator string, target int64) *api.TASPolicy {
282+
pol := &api.TASPolicy{
283+
TypeMeta: metav1.TypeMeta{},
284+
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"},
285+
Spec: api.TASPolicySpec{
286+
Strategies: map[string]api.TASPolicyStrategy{
287+
//Need to have a base deschedule to make the scheduleonmetric policy work correctly.
288+
//TODO: This should be considered a bug.
289+
str: {
290+
PolicyName: name,
291+
Rules: []api.TASPolicyRule{
292+
{Metricname: metric, Operator: operator, Target: target},
293+
},
294+
},
295+
},
296+
},
297+
}
298+
if str != dontschedule.StrategyType {
299+
pol.Spec.Strategies[dontschedule.StrategyType] =
300+
api.TASPolicyStrategy{
301+
PolicyName: "filter1",
302+
Rules: []api.TASPolicyRule{
303+
{Metricname: "filter1_metric", Operator: "Equals", Target: 2000000},
304+
},
305+
}
306+
}
307+
return pol
308+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: End to end tests
2+
3+
on: [push, pull_request]
4+
5+
jobs:
6+
end-to-end-test:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- name: Checkout
10+
uses: actions/checkout@v2
11+
- name: Set up Go version
12+
uses: actions/setup-go@v1
13+
with:
14+
go-version: 1.13
15+
- name: Get tools for cluster installation
16+
run: ./.github/scripts/e2e_get_tools.sh
17+
- name: Set up cluster with TAS and custom metrics
18+
run: ./.github/scripts/e2e_setup_cluster.sh
19+
- name: Run end to end tests
20+
run: go test -v ./.github/e2e/e2e_test.go

0 commit comments

Comments
 (0)