diff --git a/main.go b/main.go index 3a03b9708c..11a2fbf85a 100644 --- a/main.go +++ b/main.go @@ -301,6 +301,7 @@ func main() { PublicIPv6: opts.publicIPv6, } // Check the default interface only if no interfaces are specified + if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 && len(opts.ifaceCanReach) == 0 { if len(opts.publicIP) > 0 { extIface, err = ipmatch.LookupExtIface(opts.publicIP, "", "", ipStack, optsPublicIP) @@ -431,8 +432,8 @@ func main() { for { select { case <-ctx.Done(): - break - case <-time.After(time.Duration(opts.iptablesResyncSeconds) * time.Second): + break + case <-time.After(time.Duration(opts.iptablesResyncSeconds) * time.Second): if err := ip.AddBlackholeV4Route(bn.Lease().Subnet.ToIPNet()); err != nil { log.Errorf("Failed to setup blackhole route, %v", err) } @@ -445,8 +446,8 @@ func main() { for { select { case <-ctx.Done(): - break - case <-time.After(time.Duration(opts.iptablesResyncSeconds) * time.Second): + break + case <-time.After(time.Duration(opts.iptablesResyncSeconds) * time.Second): if err := ip.AddBlackholeV6Route(bn.Lease().IPv6Subnet.ToIPNet()); err != nil { log.Errorf("Failed to setup blackhole route, %v", err) } diff --git a/pkg/backend/common.go b/pkg/backend/common.go index 33a5d37c86..7a966f6040 100644 --- a/pkg/backend/common.go +++ b/pkg/backend/common.go @@ -25,6 +25,7 @@ import ( type ExternalInterface struct { Iface *net.Interface + IfaceName string IfaceAddr net.IP IfaceV6Addr net.IP ExtAddr net.IP diff --git a/pkg/backend/vxlan/vxlan.go b/pkg/backend/vxlan/vxlan.go index 815a3c970c..5badc04208 100644 --- a/pkg/backend/vxlan/vxlan.go +++ b/pkg/backend/vxlan/vxlan.go @@ -121,32 +121,76 @@ func newSubnetAttrs(publicIP net.IP, publicIPv6 net.IP, vnid uint32, dev, v6Dev func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, config *subnet.Config) (backend.Network, error) { // Parse our configuration - cfg := struct { - VNI int - Port int - MTU int - GBP bool - Learning bool - DirectRouting bool - }{ + cfg, err := parseVXLANConfig(config.Backend, be.extIface.Iface.MTU) + if err != nil { + return nil, fmt.Errorf("error decoding VXLAN backend config: %w", err) + } + log.Infof("VXLAN config: VNI=%d Port=%d GBP=%v Learning=%v DirectRouting=%v", cfg.VNI, cfg.Port, cfg.GBP, cfg.Learning, cfg.DirectRouting) + + dev, v6Dev, err := createVXLANDevice(ctx, config, cfg, be.subnetMgr, be.extIface.Iface.Index, be.extIface.ExtAddr, be.extIface.ExtV6Addr) + if err != nil { + return nil, fmt.Errorf("failed to create vxlan device: %w", err) + } + + subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, be.extIface.ExtV6Addr, uint32(cfg.VNI), dev, v6Dev) + if err != nil { + return nil, err + } + + lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs) + switch err { + case nil: + case context.Canceled, context.DeadlineExceeded: + return nil, err + default: + return nil, fmt.Errorf("failed to acquire lease: %v", err) + } + + // Ensure that the device has a /32 address so that no broadcast routes are created. + // This IP is just used as a source address for host to workload traffic (so + // the return path for the traffic has an address on the flannel network to use as the destination) + if err := configureDeviceIPv4IPv6(dev, v6Dev, lease, config); err != nil { + return nil, err + } + + return newNetwork(be.subnetMgr, be.extIface, dev, v6Dev, ip.IP4Net{}, lease, cfg.MTU) +} + +type VXLANConfig struct { + VNI int `json:"vni"` + Port int `json:"port"` + MTU int `json:"mtu"` + GBP bool `json:"gbp"` + Learning bool `json:"learning"` + DirectRouting bool `json:"directRouting"` +} + +func parseVXLANConfig(config json.RawMessage, defaultMTU int) (VXLANConfig, error) { + cfg := VXLANConfig{ VNI: defaultVNI, - MTU: be.extIface.Iface.MTU, + MTU: defaultMTU, } - if len(config.Backend) > 0 { - if err := json.Unmarshal(config.Backend, &cfg); err != nil { - return nil, fmt.Errorf("error decoding VXLAN backend config: %v", err) + if len(config) > 0 { + if err := json.Unmarshal(config, &cfg); err != nil { + return VXLANConfig{}, err } } - log.Infof("VXLAN config: VNI=%d Port=%d GBP=%v Learning=%v DirectRouting=%v", cfg.VNI, cfg.Port, cfg.GBP, cfg.Learning, cfg.DirectRouting) - - var dev, v6Dev *vxlanDevice - var err error + return cfg, nil +} +func createVXLANDevice(ctx context.Context, + config *subnet.Config, + cfg VXLANConfig, + subnetMgr subnet.Manager, + extIfaceID int, + extIfaceIP net.IP, + extIfaceV6IP net.IP, +) (dev, v6Dev *vxlanDevice, err error) { // When flannel is restarted, it will get the MAC address from the node annotations to set flannel.1 MAC address var hwAddr, hwAddrv6 net.HardwareAddr - macStr, macStrv6 := be.subnetMgr.GetStoredMacAddresses(ctx) + macStr, macStrv6 := subnetMgr.GetStoredMacAddresses(ctx) if macStr != "" { hwAddr, err = net.ParseMAC(macStr) if err != nil { @@ -160,8 +204,8 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, vni: uint32(cfg.VNI), name: fmt.Sprintf("flannel.%d", cfg.VNI), MTU: cfg.MTU, - vtepIndex: be.extIface.Iface.Index, - vtepAddr: be.extIface.IfaceAddr, + vtepIndex: extIfaceID, + vtepAddr: extIfaceIP, vtepPort: cfg.Port, gbp: cfg.GBP, learning: cfg.Learning, @@ -170,7 +214,7 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, dev, err = newVXLANDevice(&devAttrs) if err != nil { - return nil, err + return nil, nil, err } dev.directRouting = cfg.DirectRouting } @@ -188,8 +232,8 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, vni: uint32(cfg.VNI), name: fmt.Sprintf("flannel-v6.%d", cfg.VNI), MTU: cfg.MTU, - vtepIndex: be.extIface.Iface.Index, - vtepAddr: be.extIface.IfaceV6Addr, + vtepIndex: extIfaceID, + vtepAddr: extIfaceV6IP, vtepPort: cfg.Port, gbp: cfg.GBP, learning: cfg.Learning, @@ -197,45 +241,36 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, } v6Dev, err = newVXLANDevice(&v6DevAttrs) if err != nil { - return nil, err + return nil, nil, err } v6Dev.directRouting = cfg.DirectRouting } - subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, be.extIface.ExtV6Addr, uint32(cfg.VNI), dev, v6Dev) - if err != nil { - return nil, err - } - - lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs) - switch err { - case nil: - case context.Canceled, context.DeadlineExceeded: - return nil, err - default: - return nil, fmt.Errorf("failed to acquire lease: %v", err) - } + return dev, v6Dev, nil +} - // Ensure that the device has a /32 address so that no broadcast routes are created. - // This IP is just used as a source address for host to workload traffic (so - // the return path for the traffic has an address on the flannel network to use as the destination) +func configureDeviceIPv4IPv6(dev *vxlanDevice, v6Dev *vxlanDevice, lease *lease.Lease, config *subnet.Config) error { + // Configure IPv4 if enabled if config.EnableIPv4 { if lease.Subnet.Empty() { - return nil, fmt.Errorf("failed to configure interface %s: IPv4 is enabled but the lease has no IPv4", dev.link.Attrs().Name) + return fmt.Errorf("failed to configure interface %s: IPv4 is enabled but the lease has no IPv4", dev.link.Attrs().Name) } if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}, config.Network); err != nil { - return nil, fmt.Errorf("failed to configure interface %s: %w", dev.link.Attrs().Name, err) + return fmt.Errorf("failed to configure interface %s: %w", dev.link.Attrs().Name, err) } } + + // Configure IPv6 if enabled if config.EnableIPv6 { if lease.IPv6Subnet.Empty() { - return nil, fmt.Errorf("failed to configure interface %s: IPv6 is enabled but the lease has no IPv6", v6Dev.link.Attrs().Name) + return fmt.Errorf("failed to configure interface %s: IPv6 is enabled but the lease has no IPv6", v6Dev.link.Attrs().Name) } if err := v6Dev.ConfigureIPv6(ip.IP6Net{IP: lease.IPv6Subnet.IP, PrefixLen: 128}, config.IPv6Network); err != nil { - return nil, fmt.Errorf("failed to configure interface %s: %w", v6Dev.link.Attrs().Name, err) + return fmt.Errorf("failed to configure interface %s: %w", v6Dev.link.Attrs().Name, err) } } - return newNetwork(be.subnetMgr, be.extIface, dev, v6Dev, ip.IP4Net{}, lease, cfg.MTU) + + return nil } // So we can make it JSON (un)marshalable diff --git a/pkg/backend/vxlan/vxlan_network.go b/pkg/backend/vxlan/vxlan_network.go index 4d8698510a..954e4ed56c 100644 --- a/pkg/backend/vxlan/vxlan_network.go +++ b/pkg/backend/vxlan/vxlan_network.go @@ -19,9 +19,11 @@ package vxlan import ( "context" "encoding/json" + "fmt" "net" "sync" "syscall" + "time" "github.com/flannel-io/flannel/pkg/backend" "github.com/flannel-io/flannel/pkg/ip" @@ -29,6 +31,7 @@ import ( "github.com/flannel-io/flannel/pkg/retry" "github.com/flannel-io/flannel/pkg/subnet" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" log "k8s.io/klog/v2" ) @@ -60,29 +63,191 @@ func newNetwork(subnetMgr subnet.Manager, extIface *backend.ExternalInterface, d } func (nw *network) Run(ctx context.Context) { - wg := sync.WaitGroup{} + var wg sync.WaitGroup log.V(0).Info("watching for new subnet leases") - events := make(chan []lease.Event) + leaseEvents := make(chan []lease.Event) + vxlanMissingChan := make(chan bool, 1) // buffered to avoid blocking + wg.Add(1) go func() { - subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, events) + subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, leaseEvents) log.V(1).Info("WatchLeases exited") wg.Done() }() + wg.Add(1) + go func() { + nw.watchVXLANDevice(ctx, vxlanMissingChan) + log.V(1).Info("WatchVXLANDevice exited") + wg.Done() + }() + defer wg.Wait() for { - evtBatch, ok := <-events - if !ok { - log.Infof("evts chan closed") + select { + case evtBatch, ok := <-leaseEvents: + if !ok { + log.Infof("leaseEvents chan closed") + return + } + nw.handleSubnetEvents(evtBatch) + + case _, ok := <-vxlanMissingChan: + if !ok { + log.Infof("vxlanMissingChan closed") + return + } + log.Info("vxlan device missing, attempting to recreate...") + + // Offload recreate so this loop doesn’t block handleSubnetEvents + go func() { + if err := nw.reCreateVxlan(ctx); err != nil { + log.Errorf("failed to recreate vxlan: %v", err) + } + }() + } + } +} + +func (nw *network) watchVXLANDevice(ctx context.Context, vxlanMissingChan chan<- bool) { + log.Info("starting vxlan device watcher") + if nw.dev == nil { + log.Error("vxlan device is nil, cannot watch for events") + return + } + + updates := make(chan netlink.LinkUpdate) + done := make(chan struct{}) + + if err := netlink.LinkSubscribe(updates, done); err != nil { + log.Fatalf("failed to subscribe to netlink: %v", err) + } + defer close(done) + + name := nw.dev.link.Attrs().Name + defer close(vxlanMissingChan) + for { + select { + case <-ctx.Done(): + log.Info("stopping vxlan device watcher") return + + case update := <-updates: + if update.Attrs() == nil { + continue + } + // Detect deletion + if update.Attrs().Name == name && update.Header.Type == unix.RTM_DELLINK { + log.Infof("Interface %s deleted", name) + select { + case vxlanMissingChan <- true: + default: + // Skip if signal already queued + } + } } - nw.handleSubnetEvents(evtBatch) } } +func (nw *network) reCreateVxlan(ctx context.Context) error { + backoff := time.Second + maxBackoff := 30 * time.Second + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled, stopping vxlan recreate") + default: + } + + extIface, _ := net.InterfaceByName(nw.ExtIface.IfaceName) + if extIface == nil { + log.Infof("external interface %s not found, retrying in %s", nw.ExtIface.IfaceName, backoff) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + + config, err := nw.subnetMgr.GetNetworkConfig(ctx) + if err != nil { + log.Errorf("failed to get network config: %v", err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + + cfg, err := parseVXLANConfig(config.Backend, extIface.MTU) + if err != nil { + log.Errorf("failed to parse vxlan config: %v", err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + + var ifaceAddrs, ifaceAddrsV6 []net.IP + + if config.EnableIPv4 { + ifaceAddrs, err = ip.GetInterfaceIP4Addrs(extIface) + if err != nil { + log.Errorf("error getting IPv4 addresses for %s: %v", extIface.Name, err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + if len(ifaceAddrs) == 0 { + log.Warningf("no IPv4 addresses found for interface %s, retrying", extIface.Name) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + } + + if config.EnableIPv6 { + ifaceAddrsV6, err = ip.GetInterfaceIP6Addrs(extIface) + if err != nil { + log.Errorf("error getting IPv6 addresses for %s: %v", extIface.Name, err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + if len(ifaceAddrsV6) == 0 { + log.Warningf("no IPv6 addresses found for interface %s, retrying", extIface.Name) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + } + + // Create the VXLAN device + dev, v6Dev, err := createVXLANDevice(ctx, config, cfg, nw.subnetMgr, extIface.Index, ifaceAddrs[0], ifaceAddrsV6[0]) + if err != nil { + log.Errorf("failed to create vxlan device: %v", err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + + if err := configureDeviceIPv4IPv6(dev, v6Dev, nw.SubnetLease, config); err != nil { + log.Errorf("failed to configure vxlan device: %v", err) + retryAfterBackoff(&backoff, maxBackoff) + continue + } + + nw.dev = dev + nw.v6Dev = v6Dev + nw.mtu = dev.link.Attrs().MTU + log.Infof("VXLAN device %s recreated successfully", dev.link.Attrs().Name) + return nil + } +} + +func retryAfterBackoff(backoff *time.Duration, maxBackoff time.Duration) { + time.Sleep(*backoff) + *backoff = minDuration(*backoff*2, maxBackoff) +} + +// helper to cap exponential backoff +func minDuration(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + func (nw *network) MTU() int { return nw.mtu - encapOverhead } diff --git a/pkg/ipmatch/match.go b/pkg/ipmatch/match.go index 8682e10351..ce806ba755 100644 --- a/pkg/ipmatch/match.go +++ b/pkg/ipmatch/match.go @@ -302,6 +302,7 @@ func LookupExtIface(ifname string, ifregexS string, ifcanreach string, ipStack i return &backend.ExternalInterface{ Iface: iface, + IfaceName: iface.Name, IfaceAddr: ifaceAddr, IfaceV6Addr: ifaceV6Addr, ExtAddr: extAddr,