Skip to content

Commit 05125ce

Browse files
authored
Ensure that the in-cluster is always scheduled on replica-0 (#11)
1 parent 226285f commit 05125ce

File tree

3 files changed

+130
-3
lines changed

3 files changed

+130
-3
lines changed

api/autoscaler/common/load_index.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ func (s LoadIndexesDesc) Swap(i, j int) {
4343
s[i], s[j] = s[j], s[i]
4444
}
4545

46+
// Less returns true if the value of the LoadIndex at index i is greater than the value of the LoadIndex at index j.
47+
// Note that this is reversed because this is a descending sort implementation.
4648
func (s LoadIndexesDesc) Less(i, j int) bool {
4749
return s[i].Value.AsApproximateFloat64() > s[j].Value.AsApproximateFloat64()
4850
}

partitioners/longestprocessingtime/longestprocessingtime.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@ func (r *PartitionerImpl) Partition(ctx context.Context,
4242
return replicas, nil
4343
}
4444

45-
sort.Sort(common.LoadIndexesDesc(shards))
46-
bucketSize := shards[0].Value.AsApproximateFloat64()
45+
sortedShards, bucketSize := r.Sort(shards)
4746

4847
replicaCount := int32(0)
49-
for _, shard := range shards {
48+
for _, shard := range sortedShards {
5049
// Find the replica with the least current load.
5150
minLoad := float64(0)
5251
selectedReplicaIndex := -1
@@ -89,3 +88,27 @@ func (r *PartitionerImpl) Partition(ctx context.Context,
8988

9089
return replicas, nil
9190
}
91+
92+
func (r *PartitionerImpl) Sort(shards []common.LoadIndex) ([]common.LoadIndex, float64) {
93+
if len(shards) == 0 {
94+
return shards, 0
95+
}
96+
97+
sortedShards := common.LoadIndexesDesc(shards)
98+
sort.Sort(sortedShards)
99+
biggestLoad := float64(sortedShards[0].Value.AsApproximateFloat64())
100+
101+
inClusterCondition := func(loadIndex common.LoadIndex) bool {
102+
return loadIndex.Shard.Name == "in-cluster"
103+
}
104+
inClusterIndex := sort.Search(len(sortedShards), func(i int) bool {
105+
return inClusterCondition(sortedShards[i])
106+
})
107+
if inClusterIndex < len(sortedShards) && inClusterCondition(sortedShards[inClusterIndex]) {
108+
inClusterShard := sortedShards[inClusterIndex]
109+
copy(sortedShards[inClusterIndex:], sortedShards[inClusterIndex+1:])
110+
sortedShards = sortedShards[:len(sortedShards)-1]
111+
return append(common.LoadIndexesDesc{inClusterShard}, sortedShards...), biggestLoad
112+
}
113+
return sortedShards, biggestLoad
114+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package longestprocessingtime
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
"github.com/plumber-cd/argocd-autoscaler/api/autoscaler/common"
10+
"k8s.io/apimachinery/pkg/api/resource"
11+
)
12+
13+
func TestControllers(t *testing.T) {
14+
RegisterFailHandler(Fail)
15+
16+
RunSpecs(t, "LongestProcessingTimePartitioner Suite")
17+
}
18+
19+
var _ = Describe("LongestProcessingTimePartitioner", func() {
20+
Context("when unsorted list of shards", func() {
21+
// nolint:dupl
22+
It("should sort", func() {
23+
shards := []common.LoadIndex{
24+
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
25+
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
26+
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")},
27+
{Shard: common.Shard{Name: "shard4"}, Value: resource.MustParse("6")},
28+
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
29+
}
30+
partitioner := PartitionerImpl{}
31+
sortedShards, bucketSize := partitioner.Sort(shards)
32+
Expect(bucketSize).To(Equal(float64(8)))
33+
Expect(sortedShards).To(HaveLen(len(shards)))
34+
Expect(sortedShards[0].Shard.Name).To(Equal("shard3"))
35+
Expect(sortedShards[1].Shard.Name).To(Equal("shard5"))
36+
Expect(sortedShards[2].Shard.Name).To(Equal("shard4"))
37+
Expect(sortedShards[3].Shard.Name).To(Equal("shard1"))
38+
Expect(sortedShards[4].Shard.Name).To(Equal("shard2"))
39+
})
40+
41+
// nolint:dupl
42+
It("should put in-cluster first", func() {
43+
shards := []common.LoadIndex{
44+
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
45+
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
46+
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("8")},
47+
{Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")},
48+
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
49+
}
50+
partitioner := PartitionerImpl{}
51+
sortedShards, bucketSize := partitioner.Sort(shards)
52+
Expect(bucketSize).To(Equal(float64(8)))
53+
Expect(sortedShards).To(HaveLen(len(shards)))
54+
Expect(sortedShards[0].Shard.Name).To(Equal("in-cluster"))
55+
Expect(sortedShards[1].Shard.Name).To(Equal("shard3"))
56+
Expect(sortedShards[2].Shard.Name).To(Equal("shard5"))
57+
Expect(sortedShards[3].Shard.Name).To(Equal("shard1"))
58+
Expect(sortedShards[4].Shard.Name).To(Equal("shard2"))
59+
})
60+
})
61+
62+
Context("when partitioning", func() {
63+
It("should partition", func() {
64+
shards := []common.LoadIndex{
65+
{Shard: common.Shard{Name: "shard1"}, Value: resource.MustParse("5")},
66+
{Shard: common.Shard{Name: "shard2"}, Value: resource.MustParse("3")},
67+
{Shard: common.Shard{Name: "shard3"}, Value: resource.MustParse("10")},
68+
{Shard: common.Shard{Name: "in-cluster"}, Value: resource.MustParse("6")},
69+
{Shard: common.Shard{Name: "shard5"}, Value: resource.MustParse("7")},
70+
}
71+
partitioner := PartitionerImpl{}
72+
replicas, err := partitioner.Partition(context.TODO(), shards)
73+
Expect(err).ToNot(HaveOccurred())
74+
Expect(replicas).To(HaveLen(4))
75+
76+
Expect(replicas[0].ID).To(Equal(int32(0)))
77+
Expect(replicas[0].LoadIndexes).To(HaveLen(1))
78+
Expect(replicas[0].LoadIndexes[0].Shard.Name).To(Equal("in-cluster"))
79+
Expect(replicas[0].TotalLoad.AsApproximateFloat64()).To(Equal(float64(6)))
80+
Expect(replicas[0].TotalLoadDisplayValue).To(Equal("6"))
81+
82+
Expect(replicas[1].ID).To(Equal(int32(1)))
83+
Expect(replicas[1].LoadIndexes).To(HaveLen(1))
84+
Expect(replicas[1].LoadIndexes[0].Shard.Name).To(Equal("shard3"))
85+
Expect(replicas[1].TotalLoad.AsApproximateFloat64()).To(Equal(float64(10)))
86+
Expect(replicas[1].TotalLoadDisplayValue).To(Equal("10"))
87+
88+
Expect(replicas[2].ID).To(Equal(int32(2)))
89+
Expect(replicas[2].LoadIndexes).To(HaveLen(1))
90+
Expect(replicas[2].LoadIndexes[0].Shard.Name).To(Equal("shard5"))
91+
Expect(replicas[2].TotalLoad.AsApproximateFloat64()).To(Equal(float64(7)))
92+
Expect(replicas[2].TotalLoadDisplayValue).To(Equal("7"))
93+
94+
Expect(replicas[3].ID).To(Equal(int32(3)))
95+
Expect(replicas[3].LoadIndexes).To(HaveLen(2))
96+
Expect(replicas[3].LoadIndexes[0].Shard.Name).To(Equal("shard1"))
97+
Expect(replicas[3].LoadIndexes[1].Shard.Name).To(Equal("shard2"))
98+
Expect(replicas[3].TotalLoad.AsApproximateFloat64()).To(Equal(float64(8)))
99+
Expect(replicas[3].TotalLoadDisplayValue).To(Equal("8"))
100+
})
101+
})
102+
})

0 commit comments

Comments
 (0)