Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create network: use locks and reservations to solve race condition #10858

Merged
merged 3 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions pkg/drivers/kic/oci/network_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (
// it is one octet more than the one used by KVM to avoid possible conflict
const firstSubnetAddr = "192.168.49.0"

// big enough for a cluster of 254 nodes
const defaultSubnetMask = 24

// name of the default bridge network, used to lookup the MTU (see #9528)
const dockerDefaultBridge = "bridge"

Expand Down Expand Up @@ -71,30 +68,42 @@ func CreateNetwork(ociBin string, networkName string) (net.IP, error) {
if err != nil {
klog.Warningf("failed to get mtu information from the %s's default network %q: %v", ociBin, defaultBridgeName, err)
}
// Rather than iterate through all of the valid subnets, give up at 20 to avoid a lengthy user delay for something that is unlikely to work.
// will be like 192.168.49.0/24 ,...,192.168.239.0/24
subnet, err := network.FreeSubnet(firstSubnetAddr, 10, 20)
if err != nil {
klog.Errorf("error while trying to create network: %v", err)
return nil, errors.Wrap(err, "un-retryable")
}
info.gateway, err = tryCreateDockerNetwork(ociBin, subnet.IP, defaultSubnetMask, info.mtu, networkName)
if err != nil {
return info.gateway, fmt.Errorf("failed to create network after 20 attempts")

// retry up to 5 times to create container network
for attempts, subnetAddr := 0, firstSubnetAddr; attempts < 5; attempts++ {
// Rather than iterate through all of the valid subnets, give up at 20 to avoid a lengthy user delay for something that is unlikely to work.
// will be like 192.168.49.0/24,..., 192.168.220.0/24 (in increment steps of 9)
var subnet *network.Parameters
subnet, err = network.FreeSubnet(subnetAddr, 9, 20)
if err != nil {
klog.Errorf("failed to find free subnet for %s network %s after %d attempts: %v", ociBin, networkName, 20, err)
return nil, fmt.Errorf("un-retryable: %w", err)
}
info.gateway, err = tryCreateDockerNetwork(ociBin, subnet, info.mtu, networkName)
if err == nil {
klog.Infof("%s network %s %s created", ociBin, networkName, subnet.CIDR)
return info.gateway, nil
}
// don't retry if error is not adddress is taken
if !(errors.Is(err, ErrNetworkSubnetTaken) || errors.Is(err, ErrNetworkGatewayTaken)) {
klog.Errorf("error while trying to create %s network %s %s: %v", ociBin, networkName, subnet.CIDR, err)
return nil, fmt.Errorf("un-retryable: %w", err)
}
klog.Warningf("failed to create %s network %s %s, will retry: %v", ociBin, networkName, subnet.CIDR, err)
subnetAddr = subnet.IP
}
return info.gateway, nil
return info.gateway, fmt.Errorf("failed to create %s network %s: %w", ociBin, networkName, err)
}

func tryCreateDockerNetwork(ociBin string, subnetAddr string, subnetMask int, mtu int, name string) (net.IP, error) {
gateway := net.ParseIP(subnetAddr)
gateway.To4()[3]++ // first ip for gateway
klog.Infof("attempt to create network %s/%d with subnet: %s and gateway %s and MTU of %d ...", subnetAddr, subnetMask, name, gateway, mtu)
func tryCreateDockerNetwork(ociBin string, subnet *network.Parameters, mtu int, name string) (net.IP, error) {
gateway := net.ParseIP(subnet.Gateway)
klog.Infof("attempt to create %s network %s %s with gateway %s and MTU of %d ...", ociBin, name, subnet.CIDR, subnet.Gateway, mtu)
args := []string{
"network",
"create",
"--driver=bridge",
fmt.Sprintf("--subnet=%s", fmt.Sprintf("%s/%d", subnetAddr, subnetMask)),
fmt.Sprintf("--gateway=%s", gateway),
fmt.Sprintf("--subnet=%s", subnet.CIDR),
fmt.Sprintf("--gateway=%s", subnet.Gateway),
}
if ociBin == Docker {
// options documentation https://docs.docker.com/engine/reference/commandline/network_create/#bridge-driver-options
Expand Down Expand Up @@ -125,7 +134,7 @@ func tryCreateDockerNetwork(ociBin string, subnetAddr string, subnetMask int, mt
if strings.Contains(rr.Output(), "is being used by a network interface") {
return nil, ErrNetworkGatewayTaken
}
return nil, errors.Wrapf(err, "create network %s", fmt.Sprintf("%s %s/%d", name, subnetAddr, subnetMask))
return nil, fmt.Errorf("create %s network %s %s with gateway %s and MTU of %d: %w", ociBin, name, subnet.CIDR, subnet.Gateway, mtu, err)
}
return gateway, nil
}
Expand Down
73 changes: 37 additions & 36 deletions pkg/drivers/kvm/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,61 +149,62 @@ func (d *Driver) createNetwork() error {
// It is assumed that the libvirt/kvm installation has already created this network
netd, err := conn.LookupNetworkByName(d.Network)
if err != nil {
return errors.Wrapf(err, "network %s doesn't exist", d.Network)
return errors.Wrapf(err, "%s KVM network doesn't exist", d.Network)
}
log.Debugf("found existing %s KVM network", d.Network)
if netd != nil {
_ = netd.Free()
}
defer func() { _ = netd.Free() }()

// network: private
// Only create the private network if it does not already exist
netp, err := conn.LookupNetworkByName(d.PrivateNetwork)
if err != nil {
subnet, err := network.FreeSubnet(firstSubnetAddr, 10, 20)
defer func() {
if netp != nil {
_ = netp.Free()
}
}()
if err == nil {
log.Debugf("found existing private KVM network %s", d.PrivateNetwork)
return nil
}

// retry up to 5 times to create kvm network
for attempts, subnetAddr := 0, firstSubnetAddr; attempts < 5; attempts++ {
// Rather than iterate through all of the valid subnets, give up at 20 to avoid a lengthy user delay for something that is unlikely to work.
// will be like 192.168.39.0/24,..., 192.168.248.0/24 (in increment steps of 11)
var subnet *network.Parameters
subnet, err = network.FreeSubnet(subnetAddr, 11, 20)
if err != nil {
log.Debugf("error while trying to create network: %v", err)
return errors.Wrap(err, "un-retryable")
log.Debugf("failed to find free subnet for private KVM network %s after %d attempts: %v", d.PrivateNetwork, 20, err)
return fmt.Errorf("un-retryable: %w", err)
}
// create the XML for the private network from our networkTmpl
tryNet := kvmNetwork{
Name: d.PrivateNetwork,
Parameters: *subnet,
}

// create the XML for the private network from our networkTmpl
tmpl := template.Must(template.New("network").Parse(networkTmpl))
var networkXML bytes.Buffer
if err := tmpl.Execute(&networkXML, tryNet); err != nil {
return errors.Wrap(err, "executing network template")
if err = tmpl.Execute(&networkXML, tryNet); err != nil {
return fmt.Errorf("executing private KVM network template: %w", err)
}

// define the network using our template
network, err := conn.NetworkDefineXML(networkXML.String())
var network *libvirt.Network
network, err = conn.NetworkDefineXML(networkXML.String())
if err != nil {
return errors.Wrapf(err, "defining network from xml: %s", networkXML.String())
return fmt.Errorf("defining private KVM network %s %s from xml %s: %w", d.PrivateNetwork, subnet.CIDR, networkXML.String(), err)
}

// and finally create it
log.Debugf("Trying to create network %s...", d.PrivateNetwork)
create := func() error {
if err := network.Create(); err != nil {
return err
}
active, err := network.IsActive()
if err == nil && active {
return nil
}
return errors.Errorf("retrying %v", err)
}
if err := retry.Local(create, 10*time.Second); err != nil {
return errors.Wrapf(err, "creating network %s", d.PrivateNetwork)
// and finally create & start it
log.Debugf("trying to create private KVM network %s %s...", d.PrivateNetwork, subnet.CIDR)
if err = network.Create(); err == nil {
log.Debugf("private KVM network %s %s created", d.PrivateNetwork, subnet.CIDR)
return nil
}
log.Debugf("Network %s created", d.PrivateNetwork)
log.Debugf("failed to create private KVM network %s %s, will retry: %v", d.PrivateNetwork, subnet.CIDR, err)
subnetAddr = subnet.IP
}
defer func() {
if netp != nil {
_ = netp.Free()
}
}()

return nil
return fmt.Errorf("failed to create private KVM network %s: %w", d.PrivateNetwork, err)
}

func (d *Driver) deleteNetwork() error {
Expand Down
57 changes: 54 additions & 3 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ import (
"encoding/binary"
"fmt"
"net"
"sync"
"time"

"github.com/pkg/errors"
"k8s.io/klog/v2"
)

const defaultReservationPeriod = 1 * time.Minute

var (
reservedSubnets = sync.Map{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using our Existing lock package ?

func PathMutexSpec(path string) mutex.Spec {

/~https://github.com/medyagh/minikube/blob/f95d43a2c78070ab10a4e7d134ff2d0952a5ca86/pkg/util/lock/lock.g o#L48

for better maintainability of the lock code, what is the Special about Locking a network Name that is different than other Locks in minikube?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we need to lock a specific currently free network segment (not a network name) for a caller for a short period of time:

when a call to network.FreeSubnet() is made, it checks if a subnet is taken at the system level and skip those;
furthermore, in case of concurrent calls to network.FreeSubnet(), it also needs to skip all those network segments that were previously returned as 'free' (to other callers), but those other callers, within a configurable grace period, have not yet allocated the subnet at the system level

the existing locking mechanism with lock.PathMutexSpec() takes care of concurrent requests that would want to write to the same file by making them wait for their turn

on the other hand, with the free subnet allocation, we need a mechanism that would instantly know & skip if a subnet is either already taken or 'reserved' but not yet expired, and so to move on to the next free subnet

therefore i used sync.Map that has a built-in mechanism for safe concurrent access for 'reservations' with key:subnet and value:createdAt to be able to check if the subnet reservation expired (so free to reuse)

does that make sense?


// valid private network subnets (RFC1918)
privateSubnets = []net.IPNet{
// 10.0.0.0/8
Expand All @@ -46,6 +52,11 @@ var (
}
)

// reservation of free private subnet is held for defined reservation period from createdAt time.
type reservation struct {
createdAt time.Time
}

// Parameters contains main network parameters.
type Parameters struct {
IP string // IP address of the network
Expand Down Expand Up @@ -190,10 +201,14 @@ func FreeSubnet(startSubnet string, step, tries int) (*Parameters, error) {
return nil, err
}
if !taken {
klog.Infof("using free private subnet %s: %+v", n.CIDR, n)
return n, nil
if ok := reserveSubnet(startSubnet, defaultReservationPeriod); ok {
klog.Infof("using free private subnet %s: %+v", n.CIDR, n)
return n, nil
}
klog.Infof("skipping subnet %s that is reserved: %+v", n.CIDR, n)
} else {
klog.Infof("skipping subnet %s that is taken: %+v", n.CIDR, n)
}
klog.Infof("skipping subnet %s that is taken: %+v", n.CIDR, n)
} else {
klog.Infof("skipping subnet %s that is not private", n.CIDR)
}
Expand All @@ -208,3 +223,39 @@ func FreeSubnet(startSubnet string, step, tries int) (*Parameters, error) {
}
return nil, fmt.Errorf("no free private network subnets found with given parameters (start: %q, step: %d, tries: %d)", startSubnet, step, tries)
}

// reserveSubnet returns if subnet was successfully reserved for given period:
// - false, if it already has unexpired reservation
// - true, if new reservation was created or expired one renewed
// uses sync.Map to manage reservations thread-safe
func reserveSubnet(subnet string, period time.Duration) bool {
// put 'zero' reservation{} Map value for subnet Map key
// to block other processes from concurently changing this subnet
zero := reservation{}
r, loaded := reservedSubnets.LoadOrStore(subnet, zero)
// check if there was previously issued reservation
if loaded {
// back off if previous reservation was already set to 'zero'
// as then other process is already managing this subnet concurently
if r == zero {
klog.Infof("backing off reserving subnet %s (other process is managing it!): %+v", subnet, &reservedSubnets)
return false
}
// check if previous reservation expired
createdAt := r.(reservation).createdAt
if time.Since(createdAt) < period {
// unexpired reservation: restore original createdAt value
reservedSubnets.Store(subnet, reservation{createdAt: createdAt})
klog.Infof("skipping subnet %s that has unexpired reservation: %+v", subnet, &reservedSubnets)
return false
}
// expired reservation: renew setting createdAt to now
reservedSubnets.Store(subnet, reservation{createdAt: time.Now()})
klog.Infof("reusing subnet %s that has expired reservation: %+v", subnet, &reservedSubnets)
return true
}
// new reservation
klog.Infof("reserving subnet %s for %v: %+v", subnet, period, &reservedSubnets)
reservedSubnets.Store(subnet, reservation{createdAt: time.Now()})
return true
}