Skip to content

Release 1.7.4 #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN GIT_VERSION=$(git describe --tags --always) && \
FROM $BASE_IMAGE

WORKDIR /
COPY --from=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-32-11 /go-runner /usr/local/bin/go-runner
COPY --from=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-32-16 /go-runner /usr/local/bin/go-runner
COPY --from=builder /workspace/controller .

ENTRYPOINT ["/controller"]
129 changes: 80 additions & 49 deletions controllers/crds/cninode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -35,6 +36,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

Expand Down Expand Up @@ -75,7 +77,8 @@ type CNINodeReconciler struct {
clusterName string
vpcId string
finalizerManager k8s.FinalizerManager
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string) cleanup.ResourceCleaner
deletePool *semaphore.Weighted
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
}

func NewCNINodeReconciler(
Expand All @@ -88,7 +91,8 @@ func NewCNINodeReconciler(
clusterName string,
vpcId string,
finalizerManager k8s.FinalizerManager,
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string) cleanup.ResourceCleaner,
maxConcurrentWorkers int,
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner,
) *CNINodeReconciler {
return &CNINodeReconciler{
Client: client,
Expand All @@ -100,6 +104,7 @@ func NewCNINodeReconciler(
clusterName: clusterName,
vpcId: vpcId,
finalizerManager: finalizerManager,
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
newResourceCleaner: newResourceCleaner,
}
}
Expand All @@ -118,7 +123,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
nodeFound := true
node := &v1.Node{}
if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
nodeFound = false
} else {
r.log.Error(err, "failed to get the node object in CNINode reconciliation, will retry")
Expand All @@ -128,66 +133,50 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if cniNode.GetDeletionTimestamp().IsZero() {
shouldPatch := false
cniNodeCopy := cniNode.DeepCopy()
// Add cluster name tag if it does not exist
val, ok := cniNode.Spec.Tags[config.VPCCNIClusterNameKey]
if !ok || val != r.clusterName {
if len(cniNodeCopy.Spec.Tags) != 0 {
cniNodeCopy.Spec.Tags[config.VPCCNIClusterNameKey] = r.clusterName
} else {
cniNodeCopy.Spec.Tags = map[string]string{
config.VPCCNIClusterNameKey: r.clusterName,
}
}
shouldPatch = true
}
// if node exists, get & add OS label if it does not exist on CNINode
if nodeFound {
nodeLabelOS := node.ObjectMeta.Labels[config.NodeLabelOS]
val, ok = cniNode.ObjectMeta.Labels[config.NodeLabelOS]
if !ok || val != nodeLabelOS {
if len(cniNodeCopy.ObjectMeta.Labels) != 0 {
cniNodeCopy.ObjectMeta.Labels[config.NodeLabelOS] = nodeLabelOS
} else {
cniNodeCopy.ObjectMeta.Labels = map[string]string{
config.NodeLabelOS: nodeLabelOS,
}
}
shouldPatch = true
}
}
shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node)
shouldPatch = controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) || shouldPatch

if shouldPatch {
r.log.Info("patching CNINode to add required fields Tags and Labels", "cninode", cniNode.Name)
return ctrl.Result{}, r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
}

// Add finalizer if it does not exist
if err := r.finalizerManager.AddFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.log.Error(err, "failed to add finalizer on CNINode, will retry", "cniNode", cniNode.Name, "finalizer", config.NodeTerminationFinalizer)
return ctrl.Result{}, err
r.log.Info("patching CNINode to add fields Tags, Labels and finalizer", "cninode", cniNode.Name)
if err := r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{})); err != nil {
if apierrors.IsConflict(err) {
r.log.Info("failed to update cninode", "cninode", cniNode.Name, "error", err)
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil

return ctrl.Result{}, err
} else { // CNINode is marked for deletion
if !nodeFound {
// node is also deleted, proceed with running the cleanup routine and remove the finalizer

// run cleanup for Linux nodes only
if val, ok := cniNode.ObjectMeta.Labels[config.NodeLabelOS]; ok && val == config.OSLinux {
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
// run cleanup when node id is present
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId).DeleteLeakedResources(); err != nil {
r.log.Error(err, "failed to cleanup resources during node termination")
ec2API.NodeTerminationENICleanupFailure.Inc()
if !r.deletePool.TryAcquire(1) {
r.log.Info("d, will requeue request")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
go func(nodeID string) {
defer r.deletePool.Release(1)
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
defer cancel()
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
r.log.Error(err, "failed to cleanup resources during node termination")
ec2API.NodeTerminationENICleanupFailure.Inc()
}
}(nodeID)
}
}

if err := r.finalizerManager.RemoveFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
if err := r.removeFinalizer(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.log.Error(err, "failed to remove finalizer on CNINode, will retry", "cniNode", cniNode.Name, "finalizer", config.NodeTerminationFinalizer)
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Expand All @@ -207,7 +196,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
Spec: cniNode.Spec,
}

if err := r.finalizerManager.RemoveFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
if err := r.removeFinalizer(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
r.log.Error(err, "failed to remove finalizer on CNINode, will retry")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -252,7 +241,7 @@ func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.Names
oldCNINode := &v1alpha1.CNINode{}

return wait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, time.Second*3, true, func(ctx context.Context) (bool, error) {
if err := r.Client.Get(ctx, nameSpacedCNINode, oldCNINode); err != nil && errors.IsNotFound(err) {
if err := r.Client.Get(ctx, nameSpacedCNINode, oldCNINode); err != nil && apierrors.IsNotFound(err) {
return true, nil
}
return false, nil
Expand All @@ -266,3 +255,45 @@ func (r *CNINodeReconciler) createCNINodeFromObj(ctx context.Context, newCNINode
return r.Client.Create(ctx, newCNINode)
})
}

func (r *CNINodeReconciler) ensureTagsAndLabels(cniNode *v1alpha1.CNINode, node *v1.Node) (bool, error) {
shouldPatch := false
var err error
if cniNode.Spec.Tags == nil {
cniNode.Spec.Tags = make(map[string]string)
}
// add cluster name tag if it does not exist
if cniNode.Spec.Tags[config.VPCCNIClusterNameKey] != r.clusterName {
cniNode.Spec.Tags[config.VPCCNIClusterNameKey] = r.clusterName
shouldPatch = true
}
if node != nil {
var nodeID string
nodeID, err = utils.GetNodeID(node)

if nodeID != "" && cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey] != nodeID {
cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey] = nodeID
shouldPatch = true
}

// add node label if it does not exist
if cniNode.ObjectMeta.Labels == nil {
cniNode.ObjectMeta.Labels = make(map[string]string)
}
if cniNode.ObjectMeta.Labels[config.NodeLabelOS] != node.ObjectMeta.Labels[config.NodeLabelOS] {
cniNode.ObjectMeta.Labels[config.NodeLabelOS] = node.ObjectMeta.Labels[config.NodeLabelOS]
shouldPatch = true
}
}
return shouldPatch, err
}

func (r *CNINodeReconciler) removeFinalizer(ctx context.Context, cniNode *v1alpha1.CNINode, finalizer string) error {
cniNodeCopy := cniNode.DeepCopy()

if controllerutil.RemoveFinalizer(cniNodeCopy, finalizer) {
r.log.Info("removing finalizer for cninode", "name", cniNode.GetName(), "finalizer", finalizer)
return r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
}
return nil
}
61 changes: 44 additions & 17 deletions controllers/crds/cninode_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api/cleanup"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -37,6 +39,9 @@ var (
config.NodeLabelOS: "linux",
},
},
Spec: corev1.NodeSpec{
ProviderID: "aws:///us-west-2a/i-0123456789abcdef0",
},
}
reconcileRequest = reconcile.Request{
NamespacedName: types.NamespacedName{
Expand All @@ -57,6 +62,7 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
log: zap.New(),
clusterName: mockClusterName,
vpcId: "vpc-000000000000",
deletePool: semaphore.NewWeighted(10),
},
}
}
Expand All @@ -80,7 +86,7 @@ func TestCNINodeReconcile(t *testing.T) {
asserts func(reconcile.Result, error, *v1alpha1.CNINode)
}{
{
name: "verify clusterName tag and labels are added if missing",
name: "verify clusterName, instanceID, os label are added if missing",
args: args{
mockNode: mockNodeWithLabel,
mockCNINode: &v1alpha1.CNINode{
Expand All @@ -94,7 +100,7 @@ func TestCNINodeReconcile(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
assert.Equal(t, cniNode.Labels, map[string]string{config.NodeLabelOS: "linux"})
assert.Equal(t, cniNode.Spec.Tags, map[string]string{config.VPCCNIClusterNameKey: mockClusterName})
assert.Equal(t, cniNode.Spec.Tags, map[string]string{config.VPCCNIClusterNameKey: mockClusterName, config.NetworkInterfaceNodeIDKey: "i-0123456789abcdef0"})
},
},
{
Expand All @@ -113,14 +119,10 @@ func TestCNINodeReconcile(t *testing.T) {
},
},
prepare: func(f *fields) {
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string) cleanup.ResourceCleaner {
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
return f.mockResourceCleaner
}
f.mockResourceCleaner.EXPECT().DeleteLeakedResources().Times(0)

f.mockFinalizerManager.EXPECT().
RemoveFinalizers(gomock.Any(), gomock.Any(), config.NodeTerminationFinalizer).
Return(nil)
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(0)
},
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
assert.NoError(t, err)
Expand All @@ -142,27 +144,50 @@ func TestCNINodeReconcile(t *testing.T) {
},
Spec: v1alpha1.CNINodeSpec{
Tags: map[string]string{
config.NetworkInterfaceNodeIDKey: "i-1234567890",
config.NetworkInterfaceNodeIDKey: "i-0123456789abcdef0",
},
},
},
},
prepare: func(f *fields) {
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string) cleanup.ResourceCleaner {
assert.Equal(t, "i-1234567890", nodeID)
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
assert.Equal(t, "i-0123456789abcdef0", nodeID)
return f.mockResourceCleaner
}
f.mockResourceCleaner.EXPECT().DeleteLeakedResources().Times(1).Return(nil)
f.mockFinalizerManager.EXPECT().
RemoveFinalizers(gomock.Any(), gomock.Any(), config.NodeTerminationFinalizer).
Return(nil)
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(1).Return(nil)

},
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
},
},
{
name: "verify finalizer is added when labels and tags are present",
args: args{
mockNode: mockNodeWithLabel,
mockCNINode: &v1alpha1.CNINode{
ObjectMeta: metav1.ObjectMeta{
Name: mockName,
Labels: map[string]string{
config.NodeLabelOS: "linux",
},
},
Spec: v1alpha1.CNINodeSpec{
Tags: map[string]string{
config.VPCCNIClusterNameKey: mockClusterName,
config.NetworkInterfaceNodeIDKey: "i-0123456789abcdef0",
},
},
},
},
prepare: nil,
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
assert.Contains(t, cniNode.Finalizers, config.NodeTerminationFinalizer)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -188,8 +213,10 @@ func TestCNINodeReconcile(t *testing.T) {
res, err := mock.Reconciler.Reconcile(context.Background(), reconcileRequest)

cniNode := &v1alpha1.CNINode{}
getErr := mock.Reconciler.Client.Get(context.Background(), reconcileRequest.NamespacedName, cniNode)
assert.NoError(t, getErr)
if tt.args.mockCNINode.GetDeletionTimestamp() == nil {
getErr := mock.Reconciler.Client.Get(context.Background(), reconcileRequest.NamespacedName, cniNode)
assert.NoError(t, getErr)
}

if tt.asserts != nil {
tt.asserts(res, err, cniNode)
Expand Down
2 changes: 1 addition & 1 deletion controllers/custom/custom_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *CustomController) Start(ctx context.Context) error {
func (c *CustomController) WaitForCacheSync(controller cache.Controller) {
for !controller.HasSynced() && controller.LastSyncResourceVersion() == "" {
c.log.Info("waiting for controller to sync")
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
}
c.conditions.SetPodDataStoreSyncStatus(true)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/prometheus/common v0.62.0
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.13.0
golang.org/x/time v0.11.0
gomodules.xyz/jsonpatch/v2 v2.4.0
k8s.io/api v0.33.0
Expand All @@ -50,7 +51,6 @@ require (
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sync v0.13.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
)
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func main() {
flag.BoolVar(&enableProfiling, "enable-profiling", false,
"Enable runtime profiling for debugging purposes.")
flag.StringVar(&clusterName, "cluster-name", "", "The name of the k8s cluster")
flag.IntVar(&listPageLimit, "page-limit", 100,
flag.IntVar(&listPageLimit, "page-limit", 1000,
"The page size limiting the number of response for list operation to API Server")
flag.StringVar(&outputPath, "log-file", "stderr", "The path to redirect controller logs")
flag.IntVar(&healthCheckTimeout, "health-check-timeout", 28,
Expand Down Expand Up @@ -439,6 +439,7 @@ func main() {
clusterName,
vpcID,
finalizerManager,
maxNodeConcurrentReconciles,
cleanup.NewNodeResourceCleaner,
).SetupWithManager(mgr, maxNodeConcurrentReconciles)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CNINode")
Expand Down
Loading
Loading