Skip to content

Iptrestore timeout #2122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,15 @@ func main() {
// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
if opts.iptablesForwardRules {
trafficMngr.SetupAndEnsureForwardRules(ctx,
if err := trafficMngr.SetupAndEnsureForwardRules(ctx,
config.Network,
config.IPv6Network,
opts.iptablesResyncSeconds)
opts.iptablesResyncSeconds); err != nil {
log.Errorf("Failed to setup forward rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
}

if err := sm.HandleSubnetFile(opts.subnetFile, config, opts.ipMasq, bn.Lease().Subnet, bn.Lease().IPv6Subnet, bn.MTU()); err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package retry

import (
"context"
"time"

log "k8s.io/klog/v2"

retry_v4 "github.com/avast/retry-go/v4"
Expand All @@ -31,3 +34,24 @@ func Do(f func() error) error {
)

}

func DoUntil(ctx context.Context, f func() error, timeout time.Duration) error {
toctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return retry_v4.Do(f,
retry_v4.Context(toctx),
retry_v4.OnRetry(func(n uint, err error) {
log.Errorf("#%d: %s\n", n, err)
}),
retry_v4.Attempts(0),
retry_v4.WrapContextErrorWithLastError(true))
}

// DoWithOptions executes f until it does not return an error
// By default, the number of attempts is 10 with increasing delay between each
// It also accept Options from the retry package
func DoWithOptions(f func() error, opts ...retry_v4.Option) error {

return retry_v4.Do(f, opts...)

}
6 changes: 0 additions & 6 deletions pkg/subnet/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,19 +422,13 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *lease.Lea
Expiration: time.Now().Add(24 * time.Hour),
}
if cidr != nil && ksm.enableIPv4 {
if err != nil {
return nil, err
}
if !containsCIDR(ksm.subnetConf.Network.ToIPNet(), cidr) {
return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q PodCIDR of the %q node", ksm.subnetConf.Network, cidr, ksm.nodeName)
}

lease.Subnet = ip.FromIPNet(cidr)
}
if ipv6Cidr != nil {
if err != nil {
return nil, err
}
if !containsCIDR(ksm.subnetConf.IPv6Network.ToIPNet(), ipv6Cidr) {
return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q IPv6 PodCIDR of the %q node", ksm.subnetConf.IPv6Network, ipv6Cidr, ksm.nodeName)
}
Expand Down
76 changes: 35 additions & 41 deletions pkg/trafficmngr/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func (iptm *IPTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error
go func() {
<-ctx.Done()
time.Sleep(time.Second)
err := iptm.cleanUp()
cleanupCtx, cleanUpCancelFunc := context.WithTimeout(context.Background(), trafficmngr.CleanUpDeadline*time.Second)
defer cleanUpCancelFunc()
err := iptm.cleanUp(cleanupCtx)
if err != nil {
log.Errorf("iptables: error while cleaning-up: %v", err)
}
Expand All @@ -66,7 +68,7 @@ func (iptm *IPTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error
return nil
}

func (iptm *IPTablesManager) cleanUp() error {
func (iptm *IPTablesManager) cleanUp(ctx context.Context) error {
if len(iptm.ipv4Rules) > 0 {
ipt, err := iptables.New()
if err != nil {
Expand All @@ -79,7 +81,7 @@ func (iptm *IPTablesManager) cleanUp() error {
return fmt.Errorf("failed to setup IPTables. iptables-restore binary was not found: %v", err)
}
log.Info("iptables (ipv4): cleaning-up before exiting flannel...")
err = teardownIPTables(ipt, iptRestore, iptm.ipv4Rules)
err = teardownIPTables(ctx, ipt, iptRestore, iptm.ipv4Rules)
if err != nil {
log.Errorf("Failed to tear down IPTables: %v", err)
}
Expand All @@ -96,7 +98,7 @@ func (iptm *IPTablesManager) cleanUp() error {
return fmt.Errorf("failed to setup IPTables. iptables-restore binary was not found: %v", err)
}
log.Info("iptables (ipv6): cleaning-up before exiting flannel...")
err = teardownIPTables(ipt, iptRestore, iptm.ipv6Rules)
err = teardownIPTables(ctx, ipt, iptRestore, iptm.ipv6Rules)
if err != nil {
log.Errorf("Failed to tear down IPTables: %v", err)
}
Expand All @@ -117,14 +119,17 @@ func (iptm *IPTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flanne
newLease := &lease.Lease{
Subnet: prevSubnet,
}
if err := iptm.deleteIP4Tables(iptm.masqRules(prevNetwork, newLease)); err != nil {
if err := iptm.deleteIP4Tables(ctx, iptm.masqRules(prevNetwork, newLease)); err != nil {
return err
}
}

log.Infof("Setting up masking rules")
iptm.CreateIP4Chain("nat", "FLANNEL-POSTRTG")
go iptm.setupAndEnsureIP4Tables(ctx, iptm.masqRules(flannelIPv4Net, currentlease), resyncPeriod)

if err := iptm.setupAndEnsureIP4Tables(ctx, iptm.masqRules(flannelIPv4Net, currentlease)); err != nil {
return err
}
}
if !flannelIPv6Net.Empty() {
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
Expand All @@ -134,7 +139,7 @@ func (iptm *IPTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flanne
newLease := &lease.Lease{
IPv6Subnet: prevIPv6Subnet,
}
if err := iptm.deleteIP6Tables(iptm.masqIP6Rules(prevIPv6Network, newLease)); err != nil {
if err := iptm.deleteIP6Tables(ctx, iptm.masqIP6Rules(prevIPv6Network, newLease)); err != nil {
return err
}
}
Expand Down Expand Up @@ -221,17 +226,20 @@ func (iptm *IPTablesManager) masqIP6Rules(ccidr ip.IP6Net, lease *lease.Lease) [
return rules
}

func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) {
func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error {
if !flannelIPv4Network.Empty() {
log.Infof("Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP4Chain("filter", "FLANNEL-FWD")
go iptm.setupAndEnsureIP4Tables(ctx, iptm.forwardRules(flannelIPv4Network.String()), resyncPeriod)
if err := iptm.setupAndEnsureIP4Tables(ctx, iptm.forwardRules(flannelIPv4Network.String())); err != nil {
return err
}
}
if !flannelIPv6Network.Empty() {
log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP6Chain("filter", "FLANNEL-FWD")
go iptm.setupAndEnsureIP6Tables(ctx, iptm.forwardRules(flannelIPv6Network.String()), resyncPeriod)
}
return nil
}

func (iptm *IPTablesManager) forwardRules(flannelNetwork string) []trafficmngr.IPTablesRule {
Expand Down Expand Up @@ -355,7 +363,7 @@ func ipTablesCleanAndBuild(ipt IPTables, rules []trafficmngr.IPTablesRule) (IPTa
}

// ipTablesBootstrap init iptables rules using iptables-restore (with some cleaning if some rules already exists)
func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
func ipTablesBootstrap(ctx context.Context, ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
tablesRules, err := ipTablesCleanAndBuild(ipt, rules)
if err != nil {
// if we can't find iptables or if we can check existing rules, give up and return
Expand All @@ -364,7 +372,7 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic

log.V(6).Infof("trying to run iptables-restore < %+v", tablesRules)

err = iptRestore.ApplyWithoutFlush(tablesRules)
err = iptRestore.ApplyWithoutFlush(ctx, tablesRules)
if err != nil {
return fmt.Errorf("failed to apply partial iptables-restore %v", err)
}
Expand All @@ -374,40 +382,26 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic
return nil
}

func (iptm *IPTablesManager) setupAndEnsureIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule, resyncPeriod int) {
func (iptm *IPTablesManager) setupAndEnsureIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.New()
if err != nil {
// if we can't find iptables, give up and return
log.Errorf("Failed to setup IPTables. iptables binary was not found: %v", err)
return
return fmt.Errorf("Failed to setup IPTables. iptables binary was not found: %v", err)
}
iptRestore, err := NewIPTablesRestoreWithProtocol(iptables.ProtocolIPv4)
if err != nil {
// if we can't find iptables-restore, give up and return
log.Errorf("Failed to setup IPTables. iptables-restore binary was not found: %v", err)
return
return fmt.Errorf("Failed to setup IPTables. iptables-restore binary was not found: %v", err)
}

err = ipTablesBootstrap(ipt, iptRestore, rules)
err = ipTablesBootstrap(ctx, ipt, iptRestore, rules)
if err != nil {
// if we can't find iptables, give up and return
log.Errorf("Failed to bootstrap IPTables: %v", err)
return fmt.Errorf("Failed to bootstrap IPTables: %v", err)
}

iptm.ipv4Rules = append(iptm.ipv4Rules, rules...)
for {
select {
case <-ctx.Done():
//clean-up is setup in Init
return
case <-time.After(time.Duration(resyncPeriod) * time.Second):
// Ensure that all the iptables rules exist every 5 seconds
if err := ensureIPTables(ipt, iptRestore, rules); err != nil {
log.Errorf("Failed to ensure iptables rules: %v", err)
}
}

}
return ensureIPTables(ctx, ipt, iptRestore, rules)
}

func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules []trafficmngr.IPTablesRule, resyncPeriod int) {
Expand All @@ -424,7 +418,7 @@ func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules
return
}

err = ipTablesBootstrap(ipt, iptRestore, rules)
err = ipTablesBootstrap(ctx, ipt, iptRestore, rules)
if err != nil {
// if we can't find iptables, give up and return
log.Errorf("Failed to bootstrap IPTables: %v", err)
Expand All @@ -438,15 +432,15 @@ func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules
return
case <-time.After(time.Duration(resyncPeriod) * time.Second):
// Ensure that all the iptables rules exist every 5 seconds
if err := ensureIPTables(ipt, iptRestore, rules); err != nil {
if err := ensureIPTables(ctx, ipt, iptRestore, rules); err != nil {
log.Errorf("Failed to ensure iptables rules: %v", err)
}
}
}
}

// deleteIP4Tables delete specified iptables rules
func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) error {
func (iptm *IPTablesManager) deleteIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.New()
if err != nil {
// if we can't find iptables, give up and return
Expand All @@ -459,7 +453,7 @@ func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) e
log.Errorf("Failed to setup iptables-restore: %v", err)
return err
}
err = teardownIPTables(ipt, iptRestore, rules)
err = teardownIPTables(ctx, ipt, iptRestore, rules)
if err != nil {
log.Errorf("Failed to teardown iptables: %v", err)
return err
Expand All @@ -468,7 +462,7 @@ func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) e
}

// deleteIP6Tables delete specified iptables rules
func (iptm *IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) error {
func (iptm *IPTablesManager) deleteIP6Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
// if we can't find iptables, give up and return
Expand All @@ -482,15 +476,15 @@ func (iptm *IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) e
log.Errorf("Failed to setup iptables-restore: %v", err)
return err
}
err = teardownIPTables(ipt, iptRestore, rules)
err = teardownIPTables(ctx, ipt, iptRestore, rules)
if err != nil {
log.Errorf("Failed to teardown iptables: %v", err)
return err
}
return nil
}

func ensureIPTables(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
func ensureIPTables(ctx context.Context, ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
exists, err := ipTablesRulesExist(ipt, rules)
if err != nil {
return fmt.Errorf("error checking rule existence: %v", err)
Expand All @@ -502,15 +496,15 @@ func ensureIPTables(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmng
// Otherwise, teardown all the rules and set them up again
// We do this because the order of the rules is important
log.Info("Some iptables rules are missing; deleting and recreating rules")
err = ipTablesBootstrap(ipt, iptRestore, rules)
err = ipTablesBootstrap(ctx, ipt, iptRestore, rules)
if err != nil {
// if we can't find iptables, give up and return
return fmt.Errorf("error setting up rules: %v", err)
}
return nil
}

func teardownIPTables(ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
func teardownIPTables(ctx context.Context, ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IPTablesRule) error {
tablesRules := IPTablesRestoreRules{}

// Build delete rules to a transaction for iptables restore
Expand Down Expand Up @@ -547,7 +541,7 @@ func teardownIPTables(ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IP
tablesRules[rule.Table] = append(tablesRules[rule.Table], append(IPTablesRestoreRuleSpec{"-D", rule.Chain}, rule.Rulespec...))
}
}
err := iptr.ApplyWithoutFlush(tablesRules) // ApplyWithoutFlush make a diff, Apply make a replace (desired state)
err := iptr.ApplyWithoutFlush(ctx, tablesRules) // ApplyWithoutFlush make a diff, Apply make a replace (desired state)
if err != nil {
return fmt.Errorf("unable to teardown iptables: %v", err)
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/trafficmngr/iptables/iptables_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package iptables

import (
"bytes"
"context"
"fmt"
"io"
"os/exec"
"regexp"
"strconv"
"sync"
"time"

"github.com/coreos/go-iptables/iptables"
"github.com/flannel-io/flannel/pkg/retry"
log "k8s.io/klog/v2"
)

Expand All @@ -34,12 +37,14 @@ const (
ip6TablesRestoreCmd string = "ip6tables-restore"
ipTablesCmd string = "iptables"
ip6TablesCmd string = "ip6tables"
//maximum delay in seconds before iptables-restore is considered in error
iptRestoreTimeout = 15
)

// IPTablesRestore wrapper for iptables-restore
type IPTablesRestore interface {
// ApplyWithoutFlush apply without flush chains
ApplyWithoutFlush(rules IPTablesRestoreRules) error
ApplyWithoutFlush(ctx context.Context, rules IPTablesRestoreRules) error
}

// ipTablesRestore internal type
Expand Down Expand Up @@ -86,16 +91,29 @@ func NewIPTablesRestoreWithProtocol(protocol iptables.Protocol) (IPTablesRestore
}

// ApplyWithoutFlush apply without flush chains
func (iptr *ipTablesRestore) ApplyWithoutFlush(rules IPTablesRestoreRules) error {
func (iptr *ipTablesRestore) ApplyWithoutFlush(ctx context.Context, rules IPTablesRestoreRules) error {
iptr.mu.Lock()
defer iptr.mu.Unlock()
payload := buildIPTablesRestorePayload(rules)

log.V(6).Infof("trying to run with payload %s", payload)
stdout, stderr, err := iptr.runWithOutput([]string{"--noflush"}, bytes.NewBuffer([]byte(payload)))
log.V(6).Infof("trying to run iptables-restore with payload %s", payload)

err := retry.DoUntil(ctx,
func() error {
stdout, stderr, err := iptr.runWithOutput([]string{"--noflush"}, bytes.NewBuffer([]byte(payload)))
if err != nil {
log.Errorf("iptables-restore finished with error: %v", err)
log.Errorf("stdout: %s", stdout)
log.Errorf("stderr: %s", stderr)
}
return err
},
iptRestoreTimeout*time.Second)

if err != nil {
return fmt.Errorf("unable to run iptables-restore (%s, %s): %v", stdout, stderr, err)
return fmt.Errorf("unable to run iptables-restore : %v", err)
}

return nil
}

Expand Down
Loading
Loading