From d1f0bfeb097e93ee0a650b148a785e1a875fe151 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 21 Sep 2016 12:15:14 -0700 Subject: [PATCH] Delay port redirect until packet reaches container With port redirect in the ingress path happening before ipvs in the ingess sandbox, there is a chance of 5-tuple collision in the ipvs connection table for two entirely different services have different PublishedPorts but the same TargetPort. To disambiguate the ipvs connection table, delay the port redirect from PublishedPort to TargetPort until after the loadbalancing has happened in ipvs. To be specific, perform the redirect after the packet enters the real backend container namespace. Signed-off-by: Jana Radhakrishnan --- service_linux.go | 128 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 35 deletions(-) diff --git a/service_linux.go b/service_linux.go index e9d09fe40d..605ed407e5 100644 --- a/service_linux.go +++ b/service_linux.go @@ -26,6 +26,7 @@ import ( func init() { reexec.Register("fwmarker", fwMarker) + reexec.Register("redirecter", redirecter) } func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { n := ep.getNetwork() eIP := ep.Iface().Address() + if n.ingress { + if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil { + logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err) + } + } + if sb.ingress { // For the ingress sandbox if this is not gateway // endpoint do nothing. @@ -390,7 +397,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -461,7 +468,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -755,11 +762,8 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) { // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. -func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { - var ( - ingressPortsFile string - filteredPortsFile string - ) +func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { + var ingressPortsFile string if len(ingressPorts) != 0 { var err error @@ -769,14 +773,6 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port } } - if len(filteredPorts) != 0 { - var err error - filteredPortsFile, err = writePortsToFile(filteredPorts) - if err != nil { - return err - } - } - addDelOpt := "-A" if isDelete { addDelOpt = "-D" @@ -784,7 +780,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port cmd := &exec.Cmd{ Path: reexec.Self(), - Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()), + Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()), Stdout: os.Stdout, Stderr: os.Stderr, } @@ -801,13 +797,12 @@ func fwMarker() { runtime.LockOSThread() defer runtime.UnlockOSThread() - if len(os.Args) < 8 { + if len(os.Args) < 7 { logrus.Error("invalid number of arguments..") os.Exit(1) } var ingressPorts []*PortConfig - var filteredPorts []*PortConfig if os.Args[5] != "" { var err error ingressPorts, err = readPortsFromFile(os.Args[5]) @@ -817,15 +812,6 @@ func fwMarker() { } } - if os.Args[6] != "" { - var err error - filteredPorts, err = readPortsFromFile(os.Args[6]) - if err != nil { - logrus.Errorf("Failed reading filtered ports file: %v", err) - os.Exit(7) - } - } - vip := os.Args[2] fwMark, err := strconv.ParseUint(os.Args[3], 10, 32) if err != nil { @@ -835,12 +821,6 @@ func fwMarker() { addDelOpt := os.Args[4] rules := [][]string{} - for _, iPort := range filteredPorts { - rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", - addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) - rules = append(rules, rule) - } - for _, iPort := range ingressPorts { rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) @@ -860,9 +840,9 @@ func fwMarker() { } if addDelOpt == "-A" { - eIP, subnet, err := net.ParseCIDR(os.Args[7]) + eIP, subnet, err := net.ParseCIDR(os.Args[6]) if err != nil { - logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err) + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err) os.Exit(9) } @@ -889,3 +869,81 @@ func fwMarker() { } } } + +func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error { + var ingressPortsFile string + + if len(ingressPorts) != 0 { + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) + if err != nil { + return err + } + } + + cmd := &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile), + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + if err := cmd.Run(); err != nil { + return fmt.Errorf("reexec failed: %v", err) + } + + return nil +} + +// Redirecter reexec function. +func redirecter() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if len(os.Args) < 4 { + logrus.Error("invalid number of arguments..") + os.Exit(1) + } + + var ingressPorts []*PortConfig + if os.Args[3] != "" { + var err error + ingressPorts, err = readPortsFromFile(os.Args[3]) + if err != nil { + logrus.Errorf("Failed reading ingress ports file: %v", err) + os.Exit(2) + } + } + + eIP, _, err := net.ParseCIDR(os.Args[2]) + if err != nil { + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err) + os.Exit(3) + } + + rules := [][]string{} + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d", + eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) + rules = append(rules, rule) + } + + ns, err := netns.GetFromPath(os.Args[1]) + if err != nil { + logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err) + os.Exit(4) + } + defer ns.Close() + + if err := netns.Set(ns); err != nil { + logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err) + os.Exit(5) + } + + for _, rule := range rules { + if err := iptables.RawCombinedOutputNative(rule...); err != nil { + logrus.Errorf("setting up rule failed, %v: %v", rule, err) + os.Exit(5) + } + } +}