Skip to content

Commit

Permalink
Use complete port configs when plumbing mark rules
Browse files Browse the repository at this point in the history
Currently, a reference counting scheme is used to reference count all
individual port configs that need to be plumbed in the ingress to make
sure that in situations where a service with the same set of port
configs is getting added or removed doesn't accidentally remove the port
config plumbing if the add/remove notifications come out of order. This
same reference counting scheme is also used for plumbing the port-based
marking rules. But marking rules should not be plumbed based on that
because marks are always different for different instantiations of the
same service. So fixed the code to plumb port-based mark rules based on
the complete set of port configs, while plumbing pure port rules and
proxies based on a filter set of port configs based on the reference
count.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
  • Loading branch information
mrjana committed Sep 21, 2016
1 parent d271945 commit ab555e2
Showing 1 changed file with 76 additions and 39 deletions.
115 changes: 76 additions & 39 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}

if addService {
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, iPorts, false); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, filteredPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
return
}
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
Expand Down Expand Up @@ -453,15 +453,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
}

var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, iPorts, true); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, filteredPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
}
}

if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, filteredPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
Expand Down Expand Up @@ -715,33 +715,66 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
return nil
}

func writePortsToFile(ports []*PortConfig) (string, error) {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return "", err
}
defer f.Close()

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ports,
})

n, err := f.Write(buf)
if err != nil {
return "", err
}

if n < len(buf) {
return "", io.ErrShortWrite
}

return f.Name(), nil
}

func readPortsFromFile(fileName string) ([]*PortConfig, error) {
buf, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
return nil, err
}

return epRec.IngressPorts, nil
}

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, filteredPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var (
ingressPortsFile string
filteredPortsFile string
)

if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
}

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
})

n, err := f.Write(buf)
if len(filteredPorts) != 0 {
var err error
filteredPortsFile, err = writePortsToFile(filteredPorts)
if err != nil {
f.Close()
return err
}

if n < len(buf) {
f.Close()
return io.ErrShortWrite
}

ingressPortsFile = f.Name()
f.Close()
}

addDelOpt := "-A"
Expand All @@ -751,7 +784,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, filteredPortsFile, eIP.String()),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
Expand All @@ -768,27 +801,29 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 7 {
if len(os.Args) < 8 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}

var ingressPorts []*PortConfig
var filteredPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(6)
}
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if os.Args[6] != "" {
var err error
filteredPorts, err = readPortsFromFile(os.Args[6])
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
logrus.Errorf("Failed reading filtered ports file: %v", err)
os.Exit(7)
}

ingressPorts = epRec.IngressPorts
}

vip := os.Args[2]
Expand All @@ -800,12 +835,14 @@ func fwMarker() {
addDelOpt := os.Args[4]

rules := [][]string{}
for _, iPort := range ingressPorts {
for _, iPort := range filteredPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}

rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
rules = append(rules, rule)
}
Expand All @@ -823,9 +860,9 @@ func fwMarker() {
}

if addDelOpt == "-A" {
eIP, subnet, err := net.ParseCIDR(os.Args[6])
eIP, subnet, err := net.ParseCIDR(os.Args[7])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[7], err)
os.Exit(9)
}

Expand Down

0 comments on commit ab555e2

Please sign in to comment.