Skip to content

Commit

Permalink
Enable shared filesystem support
Browse files Browse the repository at this point in the history
(cherry picked from commit 974c9db)
  • Loading branch information
dchirikov authored and derekbit committed Oct 25, 2022
1 parent 42952d5 commit 9f18bde
Showing 1 changed file with 121 additions and 37 deletions.
158 changes: 121 additions & 37 deletions provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type NodePathMapData struct {
type ConfigData struct {
NodePathMap []*NodePathMapData `json:"nodePathMap,omitempty"`
CmdTimeoutSeconds int `json:"cmdTimeoutSeconds,omitempty"`
SharedFileSystemPath string `json:"sharedFileSystemPath,omitempty"`
}

type NodePathMap struct {
Expand All @@ -83,6 +84,7 @@ type NodePathMap struct {
type Config struct {
NodePathMap map[string]*NodePathMap
CmdTimeoutSeconds int
SharedFileSystemPath string
}

func NewProvisioner(stopCh chan struct{}, kubeClient *clientset.Clientset,
Expand Down Expand Up @@ -170,6 +172,15 @@ func (p *LocalPathProvisioner) getRandomPathOnNode(node string) (string, error)
}

c := p.config
sharedFS, err := p.isSharedFilesystem()
if err != nil {
return "", err
}
if sharedFS {
// we are ignoring 'node' and returning shared FS path
return c.SharedFileSystemPath, nil
}
// we are working with local FS
npMap := c.NodePathMap[node]
if npMap == nil {
npMap = c.NodePathMap[NodeDefaultNonListedNodes]
Expand All @@ -189,22 +200,57 @@ func (p *LocalPathProvisioner) getRandomPathOnNode(node string) (string, error)
return path, nil
}

func (p *LocalPathProvisioner) isSharedFilesystem() (bool, error) {
p.configMutex.RLock()
defer p.configMutex.RUnlock()

if p.config == nil {
return false, fmt.Errorf("no valid config available")
}

c := p.config
if (c.SharedFileSystemPath != "") && (len(c.NodePathMap) != 0) {
return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are defined. Please make sure only one is in use")
}

if len(c.NodePathMap) != 0 {
return false, nil
}

if c.SharedFileSystemPath != "" {
return true, nil
}

return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are unconfigured")
}

func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v1.PersistentVolume, error) {
pvc := opts.PVC
if pvc.Spec.Selector != nil {
return nil, fmt.Errorf("claim.Spec.Selector is not supported")
node := opts.SelectedNode
sharedFS, err := p.isSharedFilesystem()
if err != nil {
return nil, err
}
for _, accessMode := range pvc.Spec.AccessModes {
if accessMode != v1.ReadWriteOnce {
return nil, fmt.Errorf("Only support ReadWriteOnce access mode")
if !sharedFS {
if pvc.Spec.Selector != nil {
return nil, fmt.Errorf("claim.Spec.Selector is not supported")
}
for _, accessMode := range pvc.Spec.AccessModes {
if accessMode != v1.ReadWriteOnce {
return nil, fmt.Errorf("Only support ReadWriteOnce access mode")
}
}
if node == nil {
return nil, fmt.Errorf("configuration error, no node was specified")
}
}
node := opts.SelectedNode
if opts.SelectedNode == nil {
return nil, fmt.Errorf("configuration error, no node was specified")
}

basePath, err := p.getRandomPathOnNode(node.Name)
nodeName := ""
if node != nil {
// This clause works only with sharedFS
nodeName = node.Name
}
basePath, err := p.getRandomPathOnNode(nodeName)
if err != nil {
return nil, err
}
Expand All @@ -213,7 +259,11 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
folderName := strings.Join([]string{name, opts.PVC.Namespace, opts.PVC.Name}, "_")

path := filepath.Join(basePath, folderName)
logrus.Infof("Creating volume %v at %v:%v", name, node.Name, path)
if nodeName == "" {
logrus.Infof("Creating volume %v at %v", name, path)
} else {
logrus.Infof("Creating volume %v at %v:%v", name, nodeName, path)
}

storage := pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
provisionCmd := []string{"/bin/sh", "/script/setup"}
Expand All @@ -222,7 +272,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
Path: path,
Mode: *pvc.Spec.VolumeMode,
SizeInBytes: storage.Value(),
Node: node.Name,
Node: nodeName,
}); err != nil {
return nil, err
}
Expand All @@ -246,11 +296,34 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
}
}

valueNode, ok := node.GetLabels()[KeyNode]
if !ok {
valueNode = node.Name
var nodeAffinity *v1.VolumeNodeAffinity
if sharedFS {
// If the same filesystem is mounted across all nodes, we don't need
// affinity, as path is accessible from any node
nodeAffinity = nil
} else {
valueNode, ok := node.GetLabels()[KeyNode]
if !ok {
valueNode = nodeName
}
nodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: KeyNode,
Operator: v1.NodeSelectorOpIn,
Values: []string{
valueNode,
},
},
},
},
},
},
}
}

return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -263,23 +336,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
v1.ResourceName(v1.ResourceStorage): pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
},
PersistentVolumeSource: pvs,
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: KeyNode,
Operator: v1.NodeSelectorOpIn,
Values: []string{
valueNode,
},
},
},
},
},
},
},
NodeAffinity: nodeAffinity,
},
}, nil
}
Expand All @@ -293,7 +350,11 @@ func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) (err error) {
return err
}
if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
logrus.Infof("Deleting volume %v at %v:%v", pv.Name, node, path)
if node == "" {
logrus.Infof("Deleting volume %v at %v", pv.Name, path)
} else {
logrus.Infof("Deleting volume %v at %v:%v", pv.Name, node, path)
}
storage := pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)]
cleanupCmd := []string{"/bin/sh", "/script/teardown"}
if err := p.createHelperPod(ActionTypeDelete, cleanupCmd, volumeOptions{
Expand Down Expand Up @@ -326,6 +387,18 @@ func (p *LocalPathProvisioner) getPathAndNodeForPV(pv *v1.PersistentVolume) (pat
return "", "", fmt.Errorf("no path set")
}

sharedFS, err := p.isSharedFilesystem()
if err != nil {
return "", "", err
}

if sharedFS {
// We don't have affinity and can use any node
return path, "", nil
}

// Dealing with local filesystem

nodeAffinity := pv.Spec.NodeAffinity
if nodeAffinity == nil {
return "", "", fmt.Errorf("no NodeAffinity set")
Expand Down Expand Up @@ -368,7 +441,11 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string,
defer func() {
err = errors.Wrapf(err, "failed to %v volume %v", action, o.Name)
}()
if o.Name == "" || o.Path == "" || o.Node == "" {
sharedFS, err := p.isSharedFilesystem()
if err != nil {
return err
}
if o.Name == "" || o.Path == "" || (!sharedFS && o.Node == "") {
return fmt.Errorf("invalid empty name or path or node")
}
if !filepath.IsAbs(o.Path) {
Expand Down Expand Up @@ -438,7 +515,9 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string,
helperPod.Name = helperPod.Name[:HelperPodNameMaxLength]
}
helperPod.Namespace = p.namespace
helperPod.Spec.NodeName = o.Node
if o.Node != "" {
helperPod.Spec.NodeName = o.Node
}
helperPod.Spec.ServiceAccountName = p.serviceAccountName
helperPod.Spec.RestartPolicy = v1.RestartPolicyNever
helperPod.Spec.Tolerations = append(helperPod.Spec.Tolerations, lpvTolerations...)
Expand Down Expand Up @@ -478,7 +557,11 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string,
return fmt.Errorf("create process timeout after %v seconds", p.config.CmdTimeoutSeconds)
}

logrus.Infof("Volume %v has been %vd on %v:%v", o.Name, action, o.Node, o.Path)
if o.Node == "" {
logrus.Infof("Volume %v has been %vd on %v", o.Name, action, o.Path)
} else {
logrus.Infof("Volume %v has been %vd on %v:%v", o.Name, action, o.Node, o.Path)
}
return nil
}

Expand Down Expand Up @@ -534,6 +617,7 @@ func canonicalizeConfig(data *ConfigData) (cfg *Config, err error) {
err = errors.Wrapf(err, "config canonicalization failed")
}()
cfg = &Config{}
cfg.SharedFileSystemPath = data.SharedFileSystemPath
cfg.NodePathMap = map[string]*NodePathMap{}
for _, n := range data.NodePathMap {
if cfg.NodePathMap[n.Node] != nil {
Expand Down

0 comments on commit 9f18bde

Please sign in to comment.