Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions internal/controller/imageprefetch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,41 +195,76 @@ func getNodeNames(nodes []corev1.Node) []string {
}

func selectNodesByReplicas(imgPrefetch *ofenv1.ImagePrefetch, readyNodes []corev1.Node) ([]string, error) {
var selectNodes []string
var selectNodes []corev1.Node
targetReplicas := imgPrefetch.Spec.Replicas

if len(readyNodes) < targetReplicas {
return nil, fmt.Errorf("not enough nodes available: %d < %d", len(readyNodes), targetReplicas)
}

readyNodesMap := make(map[string]corev1.Node)
for _, node := range readyNodes {
readyNodesMap[node.Name] = node
}

readyNodesName := getNodeNames(readyNodes)
for _, node := range imgPrefetch.Status.SelectedNodes {
for _, nodeName := range imgPrefetch.Status.SelectedNodes {
if len(selectNodes) >= targetReplicas {
break
return getNodeNames(selectNodes), nil
}

if slices.Contains(readyNodesName, node) {
selectNodes = append(selectNodes, node)
if slices.Contains(readyNodesName, nodeName) {
selectNodes = append(selectNodes, readyNodesMap[nodeName])
}
}

if len(selectNodes) < targetReplicas {
sort.Slice(readyNodes, func(i, j int) bool {
return len(readyNodes[i].Status.Images) < len(readyNodes[j].Status.Images)
for i := 0; i < len(readyNodes); i++ {
if len(selectNodes) >= targetReplicas {
return getNodeNames(selectNodes), nil
}

candidates := filterSelectNodes(readyNodes, selectNodes)
if len(candidates) == 0 {
return nil, fmt.Errorf("no more nodes available for selection")
}

zoneCount := getZoneCount(selectNodes)
sort.Slice(candidates, func(i, j int) bool {
si := scoreNode(candidates[i], zoneCount)
sj := scoreNode(candidates[j], zoneCount)
return si < sj
})
selectNodes = append(selectNodes, candidates[0])
}

for _, node := range readyNodes {
if len(selectNodes) >= targetReplicas {
break
}
return getNodeNames(selectNodes), nil
}

if !slices.Contains(selectNodes, node.Name) {
selectNodes = append(selectNodes, node.Name)
}
func getZoneCount(selectedNodes []corev1.Node) map[string]int {
zoneCount := make(map[string]int)
for _, node := range selectedNodes {
zone := node.Labels[corev1.LabelTopologyZone]
zoneCount[zone]++
}
return zoneCount
}

func filterSelectNodes(readyNodes []corev1.Node, selectedNodes []corev1.Node) []corev1.Node {
var candidates []corev1.Node
selectedSet := make(map[string]bool)
for _, node := range selectedNodes {
selectedSet[node.Name] = true
}
for _, node := range readyNodes {
if !selectedSet[node.Name] {
candidates = append(candidates, node)
}
}
return candidates
}

return selectNodes, nil
func scoreNode(node corev1.Node, zoneCount map[string]int) int {
zone := node.Labels[corev1.LabelTopologyZone]
return zoneCount[zone]
}

func (r *ImagePrefetchReconciler) createOrUpdateNodeImageSet(ctx context.Context, imgPrefetch *ofenv1.ImagePrefetch, selectedNodes []string) error {
Expand Down
63 changes: 18 additions & 45 deletions internal/controller/imageprefetch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ var _ = Describe("ImagePrefetch Controller", Serial, func() {
deleteImagePrefetchResource(ctx, imagePrefetch)
})

It("Should delete NodeImageSets when the ImagePrefetch resource is deleted", func() {
It("should delete NodeImageSets when the ImagePrefetch resource is deleted", func() {
By("creating a new ImagePrefetch with replicas")

testName := "confirm-delete-node-image-set"
Expand Down Expand Up @@ -340,7 +340,7 @@ var _ = Describe("ImagePrefetch Controller", Serial, func() {
}).Should(Succeed())
})

It("Should match node names in NodeImageSets with those in ImagePrefetch Status SelectedNodes", func() {
It("should match node names in NodeImageSets with those in ImagePrefetch Status SelectedNodes", func() {
By("creating a new ImagePrefetch with replicas")
testName := "image-prefetch-status"
replicas := 4
Expand Down Expand Up @@ -380,28 +380,13 @@ var _ = Describe("ImagePrefetch Controller", Serial, func() {
deleteImagePrefetchResource(ctx, imagePrefetch)
})

It("should create NodeImageSet on the node with the fewest images", func() {
By("increasing the images included in the nodes")
node0 := fmt.Sprintf("%s-0", nodePrefix)
node1 := fmt.Sprintf("%s-1", nodePrefix)
node2 := fmt.Sprintf("%s-2", nodePrefix)
for i := 0; i < 10; i++ {
updateNodeImage(ctx, node0, []string{fmt.Sprintf("dummy/%d", i)})
updateNodeImage(ctx, node1, []string{fmt.Sprintf("dummy/%d", i)})
updateNodeImage(ctx, node2, []string{fmt.Sprintf("dummy/%d", i)})
}

It("should create NodeImageSet for the each failure domains", func() {
By("creating imagePrefetch with replicas")
testName := "fewest-images"
testName := "failure-domains"
createNamespace(ctx, testName)
replicas := 1
replicas := 3
imagePrefetch := createNewImagePrefetch(ctx, testName, ofenv1.ImagePrefetchSpec{
Images: testImagesList,
NodeSelector: metav1.LabelSelector{
MatchLabels: map[string]string{
"topology.kubernetes.io/zone": "rack0",
},
},
Images: testImagesList,
Replicas: replicas,
})

Expand All @@ -414,7 +399,18 @@ var _ = Describe("ImagePrefetch Controller", Serial, func() {
})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(nodeImageSets.Items).To(HaveLen(replicas))
g.Expect(nodeImageSets.Items[0].Spec.NodeName).To(Equal("worker-3"))

zoneSet := make(map[string]struct{})
for _, nodeImageSet := range nodeImageSets.Items {
nodeName := nodeImageSet.Spec.NodeName
node := corev1.Node{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
g.Expect(err).NotTo(HaveOccurred())
zone := node.Labels["topology.kubernetes.io/zone"]
g.Expect(zone).ToNot(BeEmpty())
zoneSet[zone] = struct{}{}
}
g.Expect(zoneSet).To(HaveLen(2)) // 2 failure domains
}).Should(Succeed())

By("cleaning up the ImagePrefetch resource")
Expand Down Expand Up @@ -932,29 +928,6 @@ func deleteImagePrefetchResource(ctx context.Context, imagePrefetch *ofenv1.Imag
}).Should(Succeed())
}

func updateNodeImage(ctx context.Context, nodeName string, images []string) {
node := &corev1.Node{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, node)
Expect(err).NotTo(HaveOccurred())

node.Status.Conditions = []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
}
err = k8sClient.Status().Update(ctx, node)
Expect(err).NotTo(HaveOccurred())

for _, image := range images {
node.Status.Images = append(node.Status.Images, corev1.ContainerImage{
Names: []string{image},
})
}
err = k8sClient.Status().Update(ctx, node)
Expect(err).NotTo(HaveOccurred())
}

func countRegistryPolicy(nodeImageSets *ofenv1.NodeImageSetList) (int, int) {
defaultPolicy, mirrorOnly := 0, 0
for _, nodeImageSet := range nodeImageSets.Items {
Expand Down