Skip to content
This repository was archived by the owner on Jan 29, 2025. It is now read-only.

Commit 2683ea1

Browse files
togashidmkillianmuldoon
authored andcommitted
Restructure logic into generic & specific Schedulers
- pkg/scheduler/scheduler.go specific logic to TAS were moved into new telemetryscheduler.go. Only generic functions/methods remained in scheduler pkg. - pkg/scheduler/types.go Server struc with ExtenderScheduler - cmd/tas-scheduler-extender/main.go new telemetry instance created to access the generic methods
1 parent bc5396c commit 2683ea1

File tree

4 files changed

+246
-236
lines changed

4 files changed

+246
-236
lines changed

cmd/tas-scheduler-extender/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"flag"
55
"github.com/intel/telemetry-aware-scheduling/pkg/cache"
66
"github.com/intel/telemetry-aware-scheduling/pkg/scheduler"
7+
"github.com/intel/telemetry-aware-scheduling/pkg/telemetryscheduler"
78
)
89

910
func main() {
@@ -19,6 +20,7 @@ func main() {
1920
flag.Parse()
2021
cacheReader := cache.RemoteClient{}
2122
cacheReader.RegisterEndpoint(cacheEndpoint)
22-
schedulerExtender := scheduler.NewMetricsExtender(&cacheReader)
23-
schedulerExtender.StartServer(port, certFile, keyFile, caFile, unsafe)
23+
telemetry := telemetryscheduler.NewMetricsExtender(&cacheReader)
24+
sch := scheduler.Server{ExtenderScheduler: telemetry}
25+
sch.StartServer(port, certFile, keyFile, caFile, unsafe)
2426
}

pkg/scheduler/scheduler.go

Lines changed: 1 addition & 234 deletions
Original file line numberDiff line numberDiff line change
@@ -4,245 +4,12 @@ package scheduler
44
import (
55
"crypto/tls"
66
"crypto/x509"
7-
"encoding/json"
8-
"errors"
9-
"fmt"
10-
"io/ioutil"
11-
12-
"github.com/intel/telemetry-aware-scheduling/pkg/cache"
13-
"github.com/intel/telemetry-aware-scheduling/pkg/metrics"
14-
"github.com/intel/telemetry-aware-scheduling/pkg/strategies/core"
15-
"github.com/intel/telemetry-aware-scheduling/pkg/strategies/dontschedule"
16-
"github.com/intel/telemetry-aware-scheduling/pkg/strategies/scheduleonmetric"
17-
telemetrypolicy "github.com/intel/telemetry-aware-scheduling/pkg/telemetrypolicy/api/v1alpha1"
18-
v1 "k8s.io/api/core/v1"
19-
207
"log"
218
"net/http"
229
"os"
23-
"strings"
2410
"time"
2511
)
2612

27-
//MetricsExtender holds information on the cache holding scheduling strategies and metrics.
28-
type MetricsExtender struct {
29-
cache cache.Reader
30-
}
31-
32-
//NewMetricsExtender returns a new metric Extender with the cache passed to it.
33-
func NewMetricsExtender(newCache cache.Reader) MetricsExtender {
34-
return MetricsExtender{
35-
cache: newCache,
36-
}
37-
}
38-
39-
//Does basic validation on the scheduling rule. returns the rule if it seems useful
40-
func (m MetricsExtender) getSchedulingRule(policy telemetrypolicy.TASPolicy) (telemetrypolicy.TASPolicyRule, error) {
41-
_, ok := policy.Spec.Strategies[scheduleonmetric.StrategyType]
42-
if ok && len(policy.Spec.Strategies[scheduleonmetric.StrategyType].Rules) > 0 {
43-
out := policy.Spec.Strategies[scheduleonmetric.StrategyType].Rules[0]
44-
if len(out.Metricname) > 0 {
45-
return out, nil
46-
}
47-
}
48-
return telemetrypolicy.TASPolicyRule{}, errors.New("no prioritize rule found for " + policy.Name)
49-
}
50-
51-
//Pulls the dontschedule strategy from a telemetry policy passed to it
52-
func (m MetricsExtender) getDontScheduleStrategy(policy telemetrypolicy.TASPolicy) (dontschedule.Strategy, error) {
53-
rawStrategy := policy.Spec.Strategies[dontschedule.StrategyType]
54-
if len(rawStrategy.Rules) == 0 {
55-
return dontschedule.Strategy{}, errors.New("no dontschedule strategy found")
56-
}
57-
strat := (dontschedule.Strategy)(rawStrategy)
58-
return strat, nil
59-
}
60-
61-
//prioritize nodes implements the logic for the prioritize scheduler call.
62-
func (m MetricsExtender) prioritizeNodes(args ExtenderArgs) *HostPriorityList {
63-
policy, err := m.getPolicyFromPod(&args.Pod)
64-
if err != nil {
65-
log.Print(err)
66-
return &HostPriorityList{}
67-
}
68-
scheduleRule, err := m.getSchedulingRule(policy)
69-
if err != nil {
70-
log.Print(err)
71-
return &HostPriorityList{}
72-
}
73-
chosenNodes, err := m.prioritizeNodesForRule(scheduleRule, args.Nodes)
74-
if err != nil {
75-
log.Print(err)
76-
return &HostPriorityList{}
77-
}
78-
log.Printf("node priorities returned: %v", chosenNodes)
79-
return &chosenNodes
80-
}
81-
82-
//prioritizeNodesForRule returns the nodes listed in order of priority after applying the appropriate telemetry rule rule.
83-
//Priorities are ordinal - there is no relationship between the outputted priorities and the metrics - simply an order of preference.
84-
func (m MetricsExtender) prioritizeNodesForRule(rule telemetrypolicy.TASPolicyRule, nodes *v1.NodeList) (HostPriorityList, error) {
85-
filteredNodeData := metrics.NodeMetricsInfo{}
86-
nodeData, err := m.cache.ReadMetric(rule.Metricname)
87-
if err != nil {
88-
return nil, fmt.Errorf("failed to prioritize: %v, %v ", err, rule.Metricname)
89-
}
90-
// Here we pull out nodes that have metrics but aren't in the filtered list
91-
for _, node := range nodes.Items {
92-
if v, ok := nodeData[node.Name]; ok {
93-
filteredNodeData[node.Name] = v
94-
}
95-
}
96-
outputNodes := HostPriorityList{}
97-
metricsOutput := fmt.Sprintf("%v for nodes: ", rule.Metricname)
98-
orderedNodes := core.OrderedList(filteredNodeData, rule.Operator)
99-
for i, node := range orderedNodes {
100-
metricsOutput = fmt.Sprint(metricsOutput, " [ ", node.NodeName, " :", node.MetricValue.AsDec(), "]")
101-
outputNodes = append(outputNodes, HostPriority{node.NodeName, 10 - i})
102-
}
103-
log.Print(metricsOutput)
104-
return outputNodes, nil
105-
}
106-
107-
//filterNodes takes in the arguments for the scheduler and filters nodes based on the pod's dontschedule strategy - if it has one in an attached policy.
108-
func (m MetricsExtender) filterNodes(args ExtenderArgs) *ExtenderFilterResult {
109-
availableNodeNames := ""
110-
filteredNodes := []v1.Node{}
111-
failedNodes := FailedNodesMap{}
112-
result := ExtenderFilterResult{}
113-
policy, err := m.getPolicyFromPod(&args.Pod)
114-
if err != nil {
115-
log.Print(err)
116-
return nil
117-
}
118-
strat, err := m.getDontScheduleStrategy(policy)
119-
if err != nil {
120-
log.Print(err)
121-
return nil
122-
}
123-
violatingNodes := strat.Violated(m.cache)
124-
if len(args.Nodes.Items) == 0 {
125-
log.Print("No nodes to compare ")
126-
return nil
127-
}
128-
for _, node := range args.Nodes.Items {
129-
if _, ok := violatingNodes[node.Name]; ok {
130-
failedNodes[node.Name] = strings.Join([]string{"Node violates"}, policy.Name)
131-
} else {
132-
filteredNodes = append(filteredNodes, node)
133-
availableNodeNames = availableNodeNames + node.Name + " "
134-
}
135-
}
136-
nodeNames := strings.Split(availableNodeNames, " ")
137-
result = ExtenderFilterResult{
138-
Nodes: &v1.NodeList{
139-
Items: filteredNodes,
140-
},
141-
NodeNames: &nodeNames,
142-
FailedNodes: failedNodes,
143-
Error: "",
144-
}
145-
if len(availableNodeNames) > 0 {
146-
log.Printf("Filtered nodes for %v : %v", policy.Name, availableNodeNames)
147-
}
148-
return &result
149-
}
150-
151-
//getPolicyFromPod returns the policy associated with a pod, if declared, from the api.
152-
func (m MetricsExtender) getPolicyFromPod(pod *v1.Pod) (telemetrypolicy.TASPolicy, error) {
153-
if policyName, ok := pod.Labels["telemetry-policy"]; ok {
154-
policy, err := m.cache.ReadPolicy(pod.Namespace, policyName)
155-
if err != nil {
156-
return telemetrypolicy.TASPolicy{}, err
157-
}
158-
return policy, nil
159-
}
160-
return telemetrypolicy.TASPolicy{}, fmt.Errorf("no policy found in pod spec for pod %v", pod.Name)
161-
}
162-
163-
//prescheduleChecks performs checks to ensure a pod is suitable for the extender.
164-
//this method will return pods as supplied if they have no declared policy
165-
func (m MetricsExtender) prescheduleChecks(w http.ResponseWriter, r *http.Request) (ExtenderArgs, http.ResponseWriter, error) {
166-
extenderArgs, err := m.decodeExtenderRequest(r)
167-
if err != nil {
168-
log.Printf("cannot decode request %v", err)
169-
return ExtenderArgs{}, w, err
170-
}
171-
if _, ok := extenderArgs.Pod.Labels["telemetry-policy"]; !ok {
172-
err = fmt.Errorf("no policy associated with pod")
173-
w.WriteHeader(http.StatusBadRequest)
174-
return ExtenderArgs{}, w, err
175-
}
176-
return extenderArgs, w, err
177-
}
178-
179-
//decodeExtenderRequest reads the json request into the expected struct.
180-
//It returns an error of the request is not in the required format.
181-
func (m MetricsExtender) decodeExtenderRequest(r *http.Request) (ExtenderArgs, error) {
182-
var args ExtenderArgs
183-
if r.Body == nil {
184-
return args, fmt.Errorf("request body empty")
185-
}
186-
decoder := json.NewDecoder(r.Body)
187-
if err := decoder.Decode(&args); err != nil {
188-
return args, fmt.Errorf("error decoding request: %v", err)
189-
}
190-
err := r.Body.Close()
191-
if err != nil {
192-
return args, err
193-
}
194-
return args, nil
195-
}
196-
197-
//writeFilterResponse takes the ExtenderFilterResults struct and writes it as a http response if valid.
198-
func (m MetricsExtender) writeFilterResponse(w http.ResponseWriter, result *ExtenderFilterResult) {
199-
encoder := json.NewEncoder(w)
200-
if err := encoder.Encode(result); err != nil {
201-
http.Error(w, "Encode error", http.StatusBadRequest)
202-
}
203-
}
204-
205-
//Write out the results of prioritize in the response to the scheduler.
206-
func (m MetricsExtender) writePrioritizeResponse(w http.ResponseWriter, result *HostPriorityList) {
207-
encoder := json.NewEncoder(w)
208-
if err := encoder.Encode(result); err != nil {
209-
http.Error(w, "Encode error ", http.StatusBadRequest)
210-
}
211-
}
212-
213-
//Prioritize manages all prioritize requests from the scheduler extender.
214-
//It decodes the package, checks its policy, and performs error checking.
215-
//It then calls the prioritize logic and writes a response to the scheduler.
216-
func (m MetricsExtender) Prioritize(w http.ResponseWriter, r *http.Request) {
217-
log.Print("Received prioritize request")
218-
extenderArgs, w, err := m.prescheduleChecks(w, r)
219-
if err != nil {
220-
log.Printf("failed to prioritze %v", err)
221-
return
222-
}
223-
prioritizedNodes := m.prioritizeNodes(extenderArgs)
224-
if prioritizedNodes == nil {
225-
w.WriteHeader(http.StatusNotFound)
226-
}
227-
m.writePrioritizeResponse(w, prioritizedNodes)
228-
}
229-
230-
//Filter manages all filter requests from the scheduler. It decodes the request, checks its policy and registers it.
231-
//It then calls the filter logic and writes a response to the scheduler.
232-
func (m MetricsExtender) Filter(w http.ResponseWriter, r *http.Request) {
233-
log.Print("filter request recieved")
234-
extenderArgs, w, err := m.prescheduleChecks(w, r)
235-
if err != nil {
236-
log.Printf("cannot filter %v", err)
237-
return
238-
}
239-
filteredNodes := m.filterNodes(extenderArgs)
240-
if filteredNodes == nil {
241-
log.Print("No filtered nodes returned")
242-
w.WriteHeader(http.StatusNotFound)
243-
}
244-
m.writeFilterResponse(w, filteredNodes)
245-
}
24613
//postOnly check if the method type is POST
24714
func postOnly(next http.HandlerFunc) http.HandlerFunc {
24815
return func(w http.ResponseWriter, r *http.Request) {
@@ -308,7 +75,7 @@ func checkSymLinks(filename string) error {
30875

30976
// StartServer starts the HTTP server needed for scheduler.
31077
// It registers the handlers and checks for existing telemetry policies.
311-
func (m MetricsExtender) StartServer(port string, certFile string, keyFile string, caFile string, unsafe bool) {
78+
func (m Server) StartServer(port string, certFile string, keyFile string, caFile string, unsafe bool) {
31279
http.HandleFunc("/", handlerWithMiddleware(errorHandler))
31380
http.HandleFunc("/scheduler/prioritize", handlerWithMiddleware(m.Prioritize))
31481
http.HandleFunc("/scheduler/filter", handlerWithMiddleware(m.Filter))

pkg/scheduler/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ type ExtenderScheduler interface {
1111
Filter(w http.ResponseWriter, r *http.Request)
1212
}
1313

14+
//Server declares ExtenderScheduler
15+
type Server struct {
16+
ExtenderScheduler
17+
}
18+
1419
//TODO: These types are in the k8s.io/kubernes/scheduler/api package
1520
// Some import issue is making them tough to access, so they are reimplemented here pending a solution.
1621

0 commit comments

Comments
 (0)