Skip to content

[Backport release-1.30] Wait for lease pool shutdown when stopping components #6019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: release-1.30
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
k0slog "github.com/k0sproject/k0s/internal/pkg/log"
"github.com/k0sproject/k0s/internal/pkg/stringmap"
"github.com/k0sproject/k0s/internal/pkg/sysinfo"
"github.com/k0sproject/k0s/internal/sync/value"
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/applier"
apclient "github.com/k0sproject/k0s/pkg/autopilot/client"
Expand Down Expand Up @@ -218,15 +219,8 @@ func (c *command) start(ctx context.Context) error {
logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type)
nodeComponents.Add(ctx, storageBackend)

controllerLeaseCounter := &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
KubeClientFactory: adminClientFactory,
}

if !c.SingleNode {
nodeComponents.Add(ctx, controllerLeaseCounter)
}
// Assume a single active controller during startup
numActiveControllers := value.NewLatest[uint](1)

if cplb := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplb != nil && cplb.Enabled {
if c.SingleNode {
Expand All @@ -249,13 +243,10 @@ func (c *command) start(ctx context.Context) error {

if enableKonnectivity {
nodeComponents.Add(ctx, &controller.Konnectivity{
SingleNode: c.SingleNode,
LogLevel: c.Logging[constant.KonnectivityServerComponentName],
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
NodeConfig: nodeConfig,
EventEmitter: prober.NewEventEmitter(),
K0sControllersLeaseCounter: controllerLeaseCounter,
K0sVars: c.K0sVars,
LogLevel: c.Logging[constant.KonnectivityServerComponentName],
EventEmitter: prober.NewEventEmitter(),
ServerCount: numActiveControllers.Peek,
})
}

Expand All @@ -268,6 +259,15 @@ func (c *command) start(ctx context.Context) error {
DisableEndpointReconciler: disableEndpointReconciler,
})

if !c.SingleNode {
nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
KubeClientFactory: adminClientFactory,
UpdateControllerCount: numActiveControllers.Set,
})
}

var leaderElector interface {
leaderelector.Interface
manager.Component
Expand Down Expand Up @@ -520,13 +520,10 @@ func (c *command) start(ctx context.Context) error {

if enableKonnectivity {
clusterComponents.Add(ctx, &controller.KonnectivityAgent{
SingleNode: c.SingleNode,
LogLevel: c.Logging[constant.KonnectivityServerComponentName],
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
NodeConfig: nodeConfig,
EventEmitter: prober.NewEventEmitter(),
K0sControllersLeaseCounter: controllerLeaseCounter,
K0sVars: c.K0sVars,
APIServerHost: nodeConfig.Spec.API.APIAddress(),
EventEmitter: prober.NewEventEmitter(),
ServerCount: numActiveControllers.Peek,
})
}

Expand Down
109 changes: 109 additions & 0 deletions internal/sync/value/latest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2024 k0s authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package value

import "sync/atomic"

// A value that can be atomically updated, where each update invalidates the
// previous value. Whenever the value changes, an associated expiration channel
// is closed. Callers can use this to be notified of updates. The zero value of
// Latest holds a zero value of T.
//
// Latest is useful when some shared state is updated frequently, and readers
// don't need to keep track of every value, just the latest one. Latest makes
// this easy, as there's no need to maintain a separate channel for each reader.
//
// Example Usage:
//
// package main
//
// import (
// "fmt"
// "sync"
// "time"
//
// "github.com/k0sproject/k0s/internal/sync/value"
// )
//
// func main() {
// // Declare a zero latest value
// var l value.Latest[int]
//
// fmt.Println("Zero value:", l.Get()) // Output: Zero value: 0
//
// // Set the value
// l.Set(42)
// fmt.Println("Value set to 42")
//
// // Peek at the current value and get the expiration channel
// value, expired := l.Peek()
// fmt.Println("Peeked value:", value) // Output: Peeked value: 42
//
// // Use a goroutine to watch for expiration
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// defer wg.Done()
// <-expired
// fmt.Println("Value expired, new value:", l.Get()) // Output: Value expired, new value: 84
// }()
//
// // Set a new value, which will expire the previous value
// time.Sleep(1 * time.Second) // Simulate some delay
// l.Set(84)
// fmt.Println("New value set to 84")
//
// wg.Wait() // Wait for the watcher goroutine to finish
// }
type Latest[T any] struct {
p atomic.Pointer[val[T]]
}

func NewLatest[T any](value T) *Latest[T] {
latest := new(Latest[T])
latest.Set(value)
return latest
}

// Retrieves the latest value and its associated expiration channel. If no value
// was previously set, it returns the zero value of T and an expiration channel
// that is closed as soon as a value is set.
func (l *Latest[T]) Peek() (T, <-chan struct{}) {
if loaded := l.p.Load(); loaded != nil {
return loaded.inner, loaded.ch
}

value := val[T]{ch: make(chan struct{})}
if !l.p.CompareAndSwap(nil, &value) {
loaded := l.p.Load()
return loaded.inner, loaded.ch
}

return value.inner, value.ch
}

// Sets a new value and closes the expiration channel of the previous value.
func (l *Latest[T]) Set(value T) {
if old := l.p.Swap(&val[T]{value, make(chan struct{})}); old != nil {
close(old.ch)
}
}

type val[T any] struct {
inner T
ch chan struct{}
}
53 changes: 53 additions & 0 deletions internal/sync/value/latest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 k0s authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package value_test

import (
"sync"
"testing"
"time"

"github.com/k0sproject/k0s/internal/sync/value"

"github.com/stretchr/testify/assert"
)

func TestLatest(t *testing.T) {
var underTest value.Latest[int]
value, expired := underTest.Peek()
assert.Zero(t, value, "Zero latest should return zero value")
assert.NotNil(t, expired)

var got int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-expired
got, _ = underTest.Peek()
}()

time.Sleep(10 * time.Millisecond) // Simulate some delay
underTest.Set(42)
wg.Wait()

assert.Equal(t, 42, got)

newValue, newExpired := underTest.Peek()
assert.Equal(t, 42, newValue)
assert.NotEqual(t, expired, newExpired)
}
13 changes: 6 additions & 7 deletions pkg/autopilot/controller/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
leaseEventStatusCh := make(chan LeaseEventStatus, 10)
errorCh := make(chan error, 10)

go func(ctx context.Context) {
go func() {
defer close(leaseEventStatusCh)
defer close(errorCh)

Expand All @@ -76,21 +76,19 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
return

default:
ctx, cancel := context.WithCancel(ctx)

leasePoolOpts := []leaderelection.LeaseOpt{
leaderelection.WithContext(ctx),
leaderelection.WithNamespace(namespace),
}

leasePool, err := leaderelection.NewLeasePool(ctx, lw.client, name, identity, leasePoolOpts...)
leasePool, err := leaderelection.NewLeasePool(lw.client, name, identity, leasePoolOpts...)
if err != nil {
errorCh <- fmt.Errorf("failed to create lease pool: %w", err)
cancel()
return
}

events, _, err := leasePool.Watch()
ctx, cancel := context.WithCancel(ctx)
events, watchDone, err := leasePool.Watch(ctx)
if err != nil {
errorCh <- fmt.Errorf("failed to watch lease pool: %w", err)
cancel()
Expand All @@ -101,9 +99,10 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
watchWg.Wait()

cancel()
<-watchDone
}
}
}(ctx)
}()

return leaseEventStatusCh, errorCh
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/autopilot/controller/updates/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ClusterInfo struct {
K0sVersion string
StorageType string
ClusterID string
ControlPlaneNodesCount int
ControlPlaneNodesCount uint
WorkerData WorkerData
CNIProvider string
Arch string
Expand Down Expand Up @@ -121,7 +121,7 @@ func CollectData(ctx context.Context, kc kubernetes.Interface) (*ClusterInfo, er
}

// Collect control plane node count
ci.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, kc)
ci.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, kc)
if err != nil {
return ci, fmt.Errorf("can't collect control plane nodes count: %w", err)
}
Expand Down
Loading
Loading