diff --git a/main.go b/main.go index e8ab0626eb..38e46f30ec 100644 --- a/main.go +++ b/main.go @@ -400,10 +400,15 @@ func main() { // In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT. // In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP. if opts.iptablesForwardRules { - trafficMngr.SetupAndEnsureForwardRules(ctx, + if err := trafficMngr.SetupAndEnsureForwardRules(ctx, config.Network, config.IPv6Network, - opts.iptablesResyncSeconds) + opts.iptablesResyncSeconds); err != nil { + log.Errorf("Failed to setup forward rules, %v", err) + cancel() + wg.Wait() + os.Exit(1) + } } if err := sm.HandleSubnetFile(opts.subnetFile, config, opts.ipMasq, bn.Lease().Subnet, bn.Lease().IPv6Subnet, bn.MTU()); err != nil { diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 17943f9184..6f79462f6b 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -15,6 +15,9 @@ package retry import ( + "context" + "time" + log "k8s.io/klog/v2" retry_v4 "github.com/avast/retry-go/v4" @@ -31,3 +34,24 @@ func Do(f func() error) error { ) } + +func DoUntil(ctx context.Context, f func() error, timeout time.Duration) error { + toctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return retry_v4.Do(f, + retry_v4.Context(toctx), + retry_v4.OnRetry(func(n uint, err error) { + log.Errorf("#%d: %s\n", n, err) + }), + retry_v4.Attempts(0), + retry_v4.WrapContextErrorWithLastError(true)) +} + +// DoWithOptions executes f until it does not return an error +// By default, the number of attempts is 10 with increasing delay between each +// It also accept Options from the retry package +func DoWithOptions(f func() error, opts ...retry_v4.Option) error { + + return retry_v4.Do(f, opts...) + +} diff --git a/pkg/subnet/kube/kube.go b/pkg/subnet/kube/kube.go index 836455c2cb..c5beee8cdc 100644 --- a/pkg/subnet/kube/kube.go +++ b/pkg/subnet/kube/kube.go @@ -422,9 +422,6 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *lease.Lea Expiration: time.Now().Add(24 * time.Hour), } if cidr != nil && ksm.enableIPv4 { - if err != nil { - return nil, err - } if !containsCIDR(ksm.subnetConf.Network.ToIPNet(), cidr) { return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q PodCIDR of the %q node", ksm.subnetConf.Network, cidr, ksm.nodeName) } @@ -432,9 +429,6 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *lease.Lea lease.Subnet = ip.FromIPNet(cidr) } if ipv6Cidr != nil { - if err != nil { - return nil, err - } if !containsCIDR(ksm.subnetConf.IPv6Network.ToIPNet(), ipv6Cidr) { return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q IPv6 PodCIDR of the %q node", ksm.subnetConf.IPv6Network, ipv6Cidr, ksm.nodeName) } diff --git a/pkg/trafficmngr/iptables/iptables.go b/pkg/trafficmngr/iptables/iptables.go index 69d6db5e89..ec228e7768 100644 --- a/pkg/trafficmngr/iptables/iptables.go +++ b/pkg/trafficmngr/iptables/iptables.go @@ -56,7 +56,9 @@ func (iptm *IPTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error go func() { <-ctx.Done() time.Sleep(time.Second) - err := iptm.cleanUp() + cleanupCtx, cleanUpCancelFunc := context.WithTimeout(context.Background(), trafficmngr.CleanUpDeadline*time.Second) + defer cleanUpCancelFunc() + err := iptm.cleanUp(cleanupCtx) if err != nil { log.Errorf("iptables: error while cleaning-up: %v", err) } @@ -66,7 +68,7 @@ func (iptm *IPTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error return nil } -func (iptm *IPTablesManager) cleanUp() error { +func (iptm *IPTablesManager) cleanUp(ctx context.Context) error { if len(iptm.ipv4Rules) > 0 { ipt, err := iptables.New() if err != nil { @@ -79,7 +81,7 @@ func (iptm *IPTablesManager) cleanUp() error { return fmt.Errorf("failed to setup IPTables. iptables-restore binary was not found: %v", err) } log.Info("iptables (ipv4): cleaning-up before exiting flannel...") - err = teardownIPTables(ipt, iptRestore, iptm.ipv4Rules) + err = teardownIPTables(ctx, ipt, iptRestore, iptm.ipv4Rules) if err != nil { log.Errorf("Failed to tear down IPTables: %v", err) } @@ -96,7 +98,7 @@ func (iptm *IPTablesManager) cleanUp() error { return fmt.Errorf("failed to setup IPTables. iptables-restore binary was not found: %v", err) } log.Info("iptables (ipv6): cleaning-up before exiting flannel...") - err = teardownIPTables(ipt, iptRestore, iptm.ipv6Rules) + err = teardownIPTables(ctx, ipt, iptRestore, iptm.ipv6Rules) if err != nil { log.Errorf("Failed to tear down IPTables: %v", err) } @@ -117,14 +119,17 @@ func (iptm *IPTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flanne newLease := &lease.Lease{ Subnet: prevSubnet, } - if err := iptm.deleteIP4Tables(iptm.masqRules(prevNetwork, newLease)); err != nil { + if err := iptm.deleteIP4Tables(ctx, iptm.masqRules(prevNetwork, newLease)); err != nil { return err } } log.Infof("Setting up masking rules") iptm.CreateIP4Chain("nat", "FLANNEL-POSTRTG") - go iptm.setupAndEnsureIP4Tables(ctx, iptm.masqRules(flannelIPv4Net, currentlease), resyncPeriod) + + if err := iptm.setupAndEnsureIP4Tables(ctx, iptm.masqRules(flannelIPv4Net, currentlease)); err != nil { + return err + } } if !flannelIPv6Net.Empty() { // recycle iptables rules only when network configured or subnet leased is not equal to current one. @@ -134,7 +139,7 @@ func (iptm *IPTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flanne newLease := &lease.Lease{ IPv6Subnet: prevIPv6Subnet, } - if err := iptm.deleteIP6Tables(iptm.masqIP6Rules(prevIPv6Network, newLease)); err != nil { + if err := iptm.deleteIP6Tables(ctx, iptm.masqIP6Rules(prevIPv6Network, newLease)); err != nil { return err } } @@ -221,17 +226,20 @@ func (iptm *IPTablesManager) masqIP6Rules(ccidr ip.IP6Net, lease *lease.Lease) [ return rules } -func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) { +func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error { if !flannelIPv4Network.Empty() { log.Infof("Changing default FORWARD chain policy to ACCEPT") iptm.CreateIP4Chain("filter", "FLANNEL-FWD") - go iptm.setupAndEnsureIP4Tables(ctx, iptm.forwardRules(flannelIPv4Network.String()), resyncPeriod) + if err := iptm.setupAndEnsureIP4Tables(ctx, iptm.forwardRules(flannelIPv4Network.String())); err != nil { + return err + } } if !flannelIPv6Network.Empty() { log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT") iptm.CreateIP6Chain("filter", "FLANNEL-FWD") go iptm.setupAndEnsureIP6Tables(ctx, iptm.forwardRules(flannelIPv6Network.String()), resyncPeriod) } + return nil } func (iptm *IPTablesManager) forwardRules(flannelNetwork string) []trafficmngr.IPTablesRule { @@ -355,7 +363,7 @@ func ipTablesCleanAndBuild(ipt IPTables, rules []trafficmngr.IPTablesRule) (IPTa } // ipTablesBootstrap init iptables rules using iptables-restore (with some cleaning if some rules already exists) -func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error { +func ipTablesBootstrap(ctx context.Context, ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error { tablesRules, err := ipTablesCleanAndBuild(ipt, rules) if err != nil { // if we can't find iptables or if we can check existing rules, give up and return @@ -364,7 +372,7 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic log.V(6).Infof("trying to run iptables-restore < %+v", tablesRules) - err = iptRestore.ApplyWithoutFlush(tablesRules) + err = iptRestore.ApplyWithoutFlush(ctx, tablesRules) if err != nil { return fmt.Errorf("failed to apply partial iptables-restore %v", err) } @@ -374,40 +382,26 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic return nil } -func (iptm *IPTablesManager) setupAndEnsureIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule, resyncPeriod int) { +func (iptm *IPTablesManager) setupAndEnsureIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error { ipt, err := iptables.New() if err != nil { // if we can't find iptables, give up and return - log.Errorf("Failed to setup IPTables. iptables binary was not found: %v", err) - return + return fmt.Errorf("Failed to setup IPTables. iptables binary was not found: %v", err) } iptRestore, err := NewIPTablesRestoreWithProtocol(iptables.ProtocolIPv4) if err != nil { // if we can't find iptables-restore, give up and return - log.Errorf("Failed to setup IPTables. iptables-restore binary was not found: %v", err) - return + return fmt.Errorf("Failed to setup IPTables. iptables-restore binary was not found: %v", err) } - err = ipTablesBootstrap(ipt, iptRestore, rules) + err = ipTablesBootstrap(ctx, ipt, iptRestore, rules) if err != nil { // if we can't find iptables, give up and return - log.Errorf("Failed to bootstrap IPTables: %v", err) + return fmt.Errorf("Failed to bootstrap IPTables: %v", err) } iptm.ipv4Rules = append(iptm.ipv4Rules, rules...) - for { - select { - case <-ctx.Done(): - //clean-up is setup in Init - return - case <-time.After(time.Duration(resyncPeriod) * time.Second): - // Ensure that all the iptables rules exist every 5 seconds - if err := ensureIPTables(ipt, iptRestore, rules); err != nil { - log.Errorf("Failed to ensure iptables rules: %v", err) - } - } - - } + return ensureIPTables(ctx, ipt, iptRestore, rules) } func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules []trafficmngr.IPTablesRule, resyncPeriod int) { @@ -424,7 +418,7 @@ func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules return } - err = ipTablesBootstrap(ipt, iptRestore, rules) + err = ipTablesBootstrap(ctx, ipt, iptRestore, rules) if err != nil { // if we can't find iptables, give up and return log.Errorf("Failed to bootstrap IPTables: %v", err) @@ -438,7 +432,7 @@ func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules return case <-time.After(time.Duration(resyncPeriod) * time.Second): // Ensure that all the iptables rules exist every 5 seconds - if err := ensureIPTables(ipt, iptRestore, rules); err != nil { + if err := ensureIPTables(ctx, ipt, iptRestore, rules); err != nil { log.Errorf("Failed to ensure iptables rules: %v", err) } } @@ -446,7 +440,7 @@ func (iptm *IPTablesManager) setupAndEnsureIP6Tables(ctx context.Context, rules } // deleteIP4Tables delete specified iptables rules -func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) error { +func (iptm *IPTablesManager) deleteIP4Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error { ipt, err := iptables.New() if err != nil { // if we can't find iptables, give up and return @@ -459,7 +453,7 @@ func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) e log.Errorf("Failed to setup iptables-restore: %v", err) return err } - err = teardownIPTables(ipt, iptRestore, rules) + err = teardownIPTables(ctx, ipt, iptRestore, rules) if err != nil { log.Errorf("Failed to teardown iptables: %v", err) return err @@ -468,7 +462,7 @@ func (iptm *IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) e } // deleteIP6Tables delete specified iptables rules -func (iptm *IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) error { +func (iptm *IPTablesManager) deleteIP6Tables(ctx context.Context, rules []trafficmngr.IPTablesRule) error { ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6) if err != nil { // if we can't find iptables, give up and return @@ -482,7 +476,7 @@ func (iptm *IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) e log.Errorf("Failed to setup iptables-restore: %v", err) return err } - err = teardownIPTables(ipt, iptRestore, rules) + err = teardownIPTables(ctx, ipt, iptRestore, rules) if err != nil { log.Errorf("Failed to teardown iptables: %v", err) return err @@ -490,7 +484,7 @@ func (iptm *IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) e return nil } -func ensureIPTables(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error { +func ensureIPTables(ctx context.Context, ipt IPTables, iptRestore IPTablesRestore, rules []trafficmngr.IPTablesRule) error { exists, err := ipTablesRulesExist(ipt, rules) if err != nil { return fmt.Errorf("error checking rule existence: %v", err) @@ -502,7 +496,7 @@ func ensureIPTables(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmng // Otherwise, teardown all the rules and set them up again // We do this because the order of the rules is important log.Info("Some iptables rules are missing; deleting and recreating rules") - err = ipTablesBootstrap(ipt, iptRestore, rules) + err = ipTablesBootstrap(ctx, ipt, iptRestore, rules) if err != nil { // if we can't find iptables, give up and return return fmt.Errorf("error setting up rules: %v", err) @@ -510,7 +504,7 @@ func ensureIPTables(ipt IPTables, iptRestore IPTablesRestore, rules []trafficmng return nil } -func teardownIPTables(ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IPTablesRule) error { +func teardownIPTables(ctx context.Context, ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IPTablesRule) error { tablesRules := IPTablesRestoreRules{} // Build delete rules to a transaction for iptables restore @@ -547,7 +541,7 @@ func teardownIPTables(ipt IPTables, iptr IPTablesRestore, rules []trafficmngr.IP tablesRules[rule.Table] = append(tablesRules[rule.Table], append(IPTablesRestoreRuleSpec{"-D", rule.Chain}, rule.Rulespec...)) } } - err := iptr.ApplyWithoutFlush(tablesRules) // ApplyWithoutFlush make a diff, Apply make a replace (desired state) + err := iptr.ApplyWithoutFlush(ctx, tablesRules) // ApplyWithoutFlush make a diff, Apply make a replace (desired state) if err != nil { return fmt.Errorf("unable to teardown iptables: %v", err) } diff --git a/pkg/trafficmngr/iptables/iptables_restore.go b/pkg/trafficmngr/iptables/iptables_restore.go index c0a828b302..e04351f771 100644 --- a/pkg/trafficmngr/iptables/iptables_restore.go +++ b/pkg/trafficmngr/iptables/iptables_restore.go @@ -18,14 +18,17 @@ package iptables import ( "bytes" + "context" "fmt" "io" "os/exec" "regexp" "strconv" "sync" + "time" "github.com/coreos/go-iptables/iptables" + "github.com/flannel-io/flannel/pkg/retry" log "k8s.io/klog/v2" ) @@ -34,12 +37,14 @@ const ( ip6TablesRestoreCmd string = "ip6tables-restore" ipTablesCmd string = "iptables" ip6TablesCmd string = "ip6tables" + //maximum delay in seconds before iptables-restore is considered in error + iptRestoreTimeout = 15 ) // IPTablesRestore wrapper for iptables-restore type IPTablesRestore interface { // ApplyWithoutFlush apply without flush chains - ApplyWithoutFlush(rules IPTablesRestoreRules) error + ApplyWithoutFlush(ctx context.Context, rules IPTablesRestoreRules) error } // ipTablesRestore internal type @@ -86,16 +91,29 @@ func NewIPTablesRestoreWithProtocol(protocol iptables.Protocol) (IPTablesRestore } // ApplyWithoutFlush apply without flush chains -func (iptr *ipTablesRestore) ApplyWithoutFlush(rules IPTablesRestoreRules) error { +func (iptr *ipTablesRestore) ApplyWithoutFlush(ctx context.Context, rules IPTablesRestoreRules) error { iptr.mu.Lock() defer iptr.mu.Unlock() payload := buildIPTablesRestorePayload(rules) - log.V(6).Infof("trying to run with payload %s", payload) - stdout, stderr, err := iptr.runWithOutput([]string{"--noflush"}, bytes.NewBuffer([]byte(payload))) + log.V(6).Infof("trying to run iptables-restore with payload %s", payload) + + err := retry.DoUntil(ctx, + func() error { + stdout, stderr, err := iptr.runWithOutput([]string{"--noflush"}, bytes.NewBuffer([]byte(payload))) + if err != nil { + log.Errorf("iptables-restore finished with error: %v", err) + log.Errorf("stdout: %s", stdout) + log.Errorf("stderr: %s", stderr) + } + return err + }, + iptRestoreTimeout*time.Second) + if err != nil { - return fmt.Errorf("unable to run iptables-restore (%s, %s): %v", stdout, stderr, err) + return fmt.Errorf("unable to run iptables-restore : %v", err) } + return nil } diff --git a/pkg/trafficmngr/iptables/iptables_test.go b/pkg/trafficmngr/iptables/iptables_test.go index ab98336c0e..9bb6d6ee56 100644 --- a/pkg/trafficmngr/iptables/iptables_test.go +++ b/pkg/trafficmngr/iptables/iptables_test.go @@ -17,6 +17,7 @@ package iptables import ( + "context" "fmt" "net" "reflect" @@ -65,7 +66,7 @@ func (mock *MockIPTablesRestore) ApplyFully(rules IPTablesRestoreRules) error { return nil } -func (mock *MockIPTablesRestore) ApplyWithoutFlush(rules IPTablesRestoreRules) error { +func (mock *MockIPTablesRestore) ApplyWithoutFlush(ctx context.Context, rules IPTablesRestoreRules) error { mock.rules = append(mock.rules, rules) return nil } @@ -127,8 +128,8 @@ func TestDeleteRules(t *testing.T) { PrefixLen: 16, }, testingLease()) expectedRules := expectedTearDownIPTablesRestoreRules(baseRules) - - err := ipTablesBootstrap(ipt, iptr, baseRules) + ctx := context.TODO() + err := ipTablesBootstrap(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error bootstrapping up iptables") } @@ -141,7 +142,7 @@ func TestDeleteRules(t *testing.T) { } iptr.rules = []IPTablesRestoreRules{} - err = teardownIPTables(ipt, iptr, baseRules) + err = teardownIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error tearing down iptables") } @@ -154,7 +155,7 @@ func TestDeleteRules(t *testing.T) { func TestDeleteMoreRules(t *testing.T) { ipt := &MockIPTables{} iptr := &MockIPTablesRestore{} - + ctx := context.TODO() baseRules := []trafficmngr.IPTablesRule{ {Table: "filter", Action: "-A", Chain: "INPUT", Rulespec: []string{"-s", "127.0.0.1", "-d", "127.0.0.1", "-j", "RETURN"}}, {Table: "filter", Action: "-A", Chain: "INPUT", Rulespec: []string{"-s", "127.0.0.1", "!", "-d", "224.0.0.0/4", "-j", "MASQUERADE", "--random-fully"}}, @@ -173,7 +174,7 @@ func TestDeleteMoreRules(t *testing.T) { }, } - err := ipTablesBootstrap(ipt, iptr, baseRules) + err := ipTablesBootstrap(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error bootstrapping up iptables") } @@ -186,7 +187,7 @@ func TestDeleteMoreRules(t *testing.T) { } iptr.rules = []IPTablesRestoreRules{} - err = teardownIPTables(ipt, iptr, baseRules) + err = teardownIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error tearing down iptables") } @@ -198,7 +199,7 @@ func TestDeleteMoreRules(t *testing.T) { func TestBootstrapRules(t *testing.T) { iptr := &MockIPTablesRestore{} ipt := &MockIPTables{} - + ctx := context.TODO() baseRules := []trafficmngr.IPTablesRule{ {Table: "filter", Action: "-A", Chain: "INPUT", Rulespec: []string{"-s", "127.0.0.1", "-d", "127.0.0.1", "-j", "RETURN"}}, {Table: "filter", Action: "-A", Chain: "INPUT", Rulespec: []string{"-s", "127.0.0.1", "!", "-d", "224.0.0.0/4", "-j", "MASQUERADE", "--random-fully"}}, @@ -206,7 +207,7 @@ func TestBootstrapRules(t *testing.T) { {Table: "nat", Action: "-A", Chain: "POSTROUTING", Rulespec: []string{"-s", "127.0.0.1", "!", "-d", "224.0.0.0/4", "-j", "MASQUERADE", "--random-fully"}}, } - err := ipTablesBootstrap(ipt, iptr, baseRules) + err := ipTablesBootstrap(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error bootstrapping up iptables") } @@ -248,7 +249,7 @@ func TestBootstrapRules(t *testing.T) { }, } // Re-run ensure has new operations - err = ipTablesBootstrap(ipt, iptr, baseRules) + err = ipTablesBootstrap(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error bootstrapping up iptables") } @@ -260,13 +261,14 @@ func TestBootstrapRules(t *testing.T) { func TestDeleteIP6Rules(t *testing.T) { ipt := &MockIPTables{} iptr := &MockIPTablesRestore{} + ctx := context.TODO() baseRules := IP6Rules(ip.IP6Net{}, testingLease()) // expect to have the same DELETE rules expectedRules := IP6RestoreDeleteRules(ip.IP6Net{}, testingLease()) - err := ipTablesBootstrap(ipt, iptr, baseRules) + err := ipTablesBootstrap(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error bootstrapping up iptables") } @@ -278,7 +280,7 @@ func TestDeleteIP6Rules(t *testing.T) { t.Errorf("Should be 4 masqRules, there are actually %d: %#v", len(ipt.rules), ipt.rules) } iptr.rules = []IPTablesRestoreRules{} - err = teardownIPTables(ipt, iptr, baseRules) + err = teardownIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Error("Error tearing down iptables") } @@ -290,6 +292,7 @@ func TestDeleteIP6Rules(t *testing.T) { func TestEnsureRules(t *testing.T) { iptr := &MockIPTablesRestore{} ipt := &MockIPTables{} + ctx := context.TODO() // Ensure iptable mock has other rules otherRules := []trafficmngr.IPTablesRule{ @@ -305,7 +308,7 @@ func TestEnsureRules(t *testing.T) { {Table: "nat", Action: "-A", Chain: "POSTROUTING", Rulespec: []string{"-s", "127.0.0.1", "!", "-d", "224.0.0.0/4", "-j", "MASQUERADE", "--random-fully"}}, } - err = ensureIPTables(ipt, iptr, baseRules) + err = ensureIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Errorf("ensureIPTables should have completed without errors") } @@ -328,7 +331,7 @@ func TestEnsureRules(t *testing.T) { iptr.rules = []IPTablesRestoreRules{} // Re-run ensure no new operations - err = ensureIPTables(ipt, iptr, baseRules) + err = ensureIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Errorf("ensureIPTables should have completed without errors") } @@ -340,6 +343,7 @@ func TestEnsureRules(t *testing.T) { func TestEnsureIP6Rules(t *testing.T) { iptr := &MockIPTablesRestore{} ipt := &MockIPTables{} + ctx := context.TODO() // Ensure iptable mock has other rules otherRules := []trafficmngr.IPTablesRule{ @@ -352,7 +356,7 @@ func TestEnsureIP6Rules(t *testing.T) { baseRules := IP6Rules(ip.IP6Net{}, testingLease()) - err = ensureIPTables(ipt, iptr, baseRules) + err = ensureIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Errorf("ensureIPTables should have completed without errors") } @@ -370,7 +374,7 @@ func TestEnsureIP6Rules(t *testing.T) { iptr.rules = []IPTablesRestoreRules{} // Re-run ensure no new operations - err = ensureIPTables(ipt, iptr, baseRules) + err = ensureIPTables(ctx, ipt, iptr, baseRules) if err != nil { t.Errorf("ensureIPTables should have completed without errors") } diff --git a/pkg/trafficmngr/iptables/iptables_windows.go b/pkg/trafficmngr/iptables/iptables_windows.go index c3a8d0ed0e..f05d7b0e15 100644 --- a/pkg/trafficmngr/iptables/iptables_windows.go +++ b/pkg/trafficmngr/iptables/iptables_windows.go @@ -40,7 +40,8 @@ func (iptm IPTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error return nil } -func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) { +func (iptm *IPTablesManager) SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error { + return nil } func (iptm *IPTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flannelIPv4Net, prevSubnet, prevNetwork ip.IP4Net, diff --git a/pkg/trafficmngr/nftables/nftables.go b/pkg/trafficmngr/nftables/nftables.go index b815c9b8d1..f870bf33af 100644 --- a/pkg/trafficmngr/nftables/nftables.go +++ b/pkg/trafficmngr/nftables/nftables.go @@ -26,6 +26,7 @@ import ( "github.com/flannel-io/flannel/pkg/ip" "github.com/flannel-io/flannel/pkg/lease" + "github.com/flannel-io/flannel/pkg/trafficmngr" "sigs.k8s.io/knftables" ) @@ -34,8 +35,6 @@ const ( ipv6Table = "flannel-ipv6" forwardChain = "forward" postrtgChain = "postrtg" - //maximum delay in second to clean-up when the context is cancelled - cleanUpDeadline = 15 ) type NFTablesManager struct { @@ -60,7 +59,7 @@ func (nftm *NFTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error <-ctx.Done() log.Info("Cleaning-up flannel tables...") - cleanupCtx, cleanUpCancelFunc := context.WithTimeout(context.Background(), cleanUpDeadline*time.Second) + cleanupCtx, cleanUpCancelFunc := context.WithTimeout(context.Background(), trafficmngr.CleanUpDeadline*time.Second) defer cleanUpCancelFunc() err := nftm.cleanUp(cleanupCtx) log.Errorf("nftables: error while cleaning-up: %v", err) @@ -90,7 +89,7 @@ func initTable(ctx context.Context, ipFamily knftables.Family, name string) (knf // It is needed when using nftables? accept seems to be the default // warning: never add a default 'drop' policy on the forwardChain as it breaks connectivity to the node func (nftm *NFTablesManager) SetupAndEnsureForwardRules(ctx context.Context, - flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) { + flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error { if !flannelIPv4Network.Empty() { log.Infof("Changing default FORWARD chain policy to ACCEPT") tx := nftm.nftv4.NewTransaction() @@ -122,7 +121,7 @@ func (nftm *NFTablesManager) SetupAndEnsureForwardRules(ctx context.Context, }) err := nftm.nftv4.Run(ctx, tx) if err != nil { - log.Errorf("nftables: couldn't setup forward rules: %v", err) + return fmt.Errorf("nftables: couldn't setup forward rules: %v", err) } } if !flannelIPv6Network.Empty() { @@ -156,9 +155,10 @@ func (nftm *NFTablesManager) SetupAndEnsureForwardRules(ctx context.Context, }) err := nftm.nftv6.Run(ctx, tx) if err != nil { - log.Errorf("nftables: couldn't setup forward rules (ipv6): %v", err) + return fmt.Errorf("nftables: couldn't setup forward rules (ipv6): %v", err) } } + return nil } func (nftm *NFTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flannelIPv4Net, prevSubnet, prevNetwork ip.IP4Net, diff --git a/pkg/trafficmngr/nftables/nftables_windows.go b/pkg/trafficmngr/nftables/nftables_windows.go index d6e9f1a18e..29aa19299a 100644 --- a/pkg/trafficmngr/nftables/nftables_windows.go +++ b/pkg/trafficmngr/nftables/nftables_windows.go @@ -34,7 +34,8 @@ func (nftm *NFTablesManager) Init(ctx context.Context, wg *sync.WaitGroup) error } func (nftm *NFTablesManager) SetupAndEnsureForwardRules(ctx context.Context, - flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) { + flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error { + return nil } func (nftm *NFTablesManager) SetupAndEnsureMasqRules(ctx context.Context, flannelIPv4Net, prevSubnet, prevNetwork ip.IP4Net, diff --git a/pkg/trafficmngr/trafficmngr.go b/pkg/trafficmngr/trafficmngr.go index 5487320aa5..837d9495a8 100644 --- a/pkg/trafficmngr/trafficmngr.go +++ b/pkg/trafficmngr/trafficmngr.go @@ -34,7 +34,11 @@ var ( ErrUnimplemented = errors.New("unimplemented") ) -const KubeProxyMark string = "0x4000/0x4000" +const ( + KubeProxyMark string = "0x4000/0x4000" + //maximum delay in second to clean-up when the context is cancelled + CleanUpDeadline = 15 +) type TrafficManager interface { // Initialize the TrafficManager, including the go routine to clean-up when flanneld is closed @@ -43,7 +47,7 @@ type TrafficManager interface { // This is done for IPv4 and/or IPv6 based on whether flannelIPv4Network and flannelIPv6Network are set. // SetupAndEnsureForwardRules starts a go routine that // rewrites these rules every resyncPeriod seconds if needed - SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) + SetupAndEnsureForwardRules(ctx context.Context, flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) error // Install kernel rules to setup NATing of packets sent to the flannel interface // This is done for IPv4 and/or IPv6 based on whether flannelIPv4Network and flannelIPv6Network are set. // prevSubnet,prevNetworks, prevIPv6Subnet, prevIPv6Networks are used