Skip to content

Commit

Permalink
Support specifying MinIO path in MinIO synchronization scenes (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
mstmdev authored Oct 30, 2023
1 parent b0c4c66 commit 5637ae9
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 44 deletions.
4 changes: 2 additions & 2 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func runApiClient(user *auth.User) (err error) {
if err != nil {
return err
}
if i == 0 && (!c.SyncOnce || c.Source.Path() != "source" || c.Dest.Path() != "dest") {
if i == 0 && (!c.SyncOnce || c.Source.Path().Base() != "source" || c.Dest.Path().Base() != "dest") {
return errors.New("unexpect arguments")
}
if i == 1 && (c.SyncOnce || c.Source.Path() != "source" || c.Dest.Path() != "dest") {
if i == 1 && (c.SyncOnce || c.Source.Path().Base() != "source" || c.Dest.Path().Base() != "dest") {
return errors.New("unexpect arguments")
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/gofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func executeOnce(c conf.Config, logger *logger.Logger) (exit bool, err error) {

// clear the deleted files
if c.ClearDeletedPath {
return true, logger.ErrorIf(fs.ClearDeletedFile(c.Dest.Path(), logger), "clear the deleted files error")
return true, logger.ErrorIf(fs.ClearDeletedFile(c.Dest.Path().Base(), logger), "clear the deleted files error")
}

// decrypt the specified file or directory
Expand All @@ -252,7 +252,7 @@ func executeOnce(c conf.Config, logger *logger.Logger) (exit bool, err error) {

// calculate checksum
if c.Checksum {
return true, checksum.PrintChecksum(c.Source.Path(), c.ChunkSize, c.CheckpointCount, c.ChecksumAlgorithm, logger)
return true, checksum.PrintChecksum(c.Source.Path().Base(), c.ChunkSize, c.CheckpointCount, c.ChecksumAlgorithm, logger)
}
return false, nil
}
Expand Down
9 changes: 8 additions & 1 deletion core/path.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package core

import "strings"
import (
"path/filepath"
"strings"
)

type Path struct {
origin string
Expand Down Expand Up @@ -34,6 +37,10 @@ func (p Path) String() string {
}

func (p *Path) parse() {
// maybe the remote os is different from the current os, force convert remote path to slash
if p.fsType != MinIO {
p.origin = filepath.ToSlash(filepath.Clean(p.origin))
}
p.base = p.origin

if p.fsType == MinIO {
Expand Down
27 changes: 13 additions & 14 deletions core/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
// VFS virtual file system
type VFS struct {
original string
path string
remotePath string
path Path
remotePath Path
fsType VFSType
host string
port int
Expand Down Expand Up @@ -49,23 +49,23 @@ const (
)

// Path the local file path
func (vfs *VFS) Path() string {
func (vfs *VFS) Path() Path {
return vfs.path
}

// RemotePath the remote file path
func (vfs *VFS) RemotePath() string {
func (vfs *VFS) RemotePath() Path {
return vfs.remotePath
}

// Abs returns an absolute representation of Path
func (vfs *VFS) Abs() (string, error) {
return filepath.Abs(vfs.Path())
return filepath.Abs(vfs.Path().Base())
}

// IsEmpty whether the local file path is empty
func (vfs *VFS) IsEmpty() bool {
return len(vfs.Path()) == 0
return len(vfs.Path().String()) == 0
}

// Type file system type
Expand Down Expand Up @@ -127,7 +127,7 @@ func (vfs *VFS) SSHConfig() SSHConfig {
func NewDiskVFS(path string) VFS {
vfs := VFS{
fsType: Disk,
path: filepath.Clean(path),
path: newPath(filepath.Clean(path), Disk),
original: path,
}
return vfs
Expand All @@ -149,21 +149,21 @@ func NewVFS(path string) VFS {
if strings.HasPrefix(lowerPath, remoteServerScheme+schemeDelimiter) {
// example of rs protocol to see README.md
vfs.fsType = RemoteDisk
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, _, err = parse(path)
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, _, err = parse(path, vfs.fsType)
} else if strings.HasPrefix(lowerPath, sftpServerScheme+schemeDelimiter) {
vfs.fsType = SFTP
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, vfs.sshConf, err = parse(path)
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, vfs.sshConf, err = parse(path, vfs.fsType)
} else if strings.HasPrefix(lowerPath, minIOServerScheme+schemeDelimiter) {
vfs.fsType = MinIO
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, _, err = parse(path)
_, vfs.host, vfs.port, vfs.path, vfs.remotePath, vfs.server, vfs.fsServer, vfs.localSyncDisabled, vfs.secure, _, err = parse(path, vfs.fsType)
}
if err != nil {
return NewEmptyVFS()
}
return vfs
}

func parse(path string) (scheme string, host string, port int, localPath string, remotePath string, isServer bool, fsServer string, localSyncDisabled bool, secure bool, sshConf SSHConfig, err error) {
func parse(path string, fsType VFSType) (scheme string, host string, port int, localPath Path, remotePath Path, isServer bool, fsServer string, localSyncDisabled bool, secure bool, sshConf SSHConfig, err error) {
parseUrl, err := url.Parse(path)
if err != nil {
return
Expand All @@ -187,9 +187,8 @@ func parse(path string) (scheme string, host string, port int, localPath string,
}
}

localPath = filepath.Clean(parseUrl.Query().Get(paramPath))
// maybe the remote os is different from the current os, force convert remote path to slash
remotePath = filepath.ToSlash(filepath.Clean(parseUrl.Query().Get(paramRemotePath)))
localPath = newPath(parseUrl.Query().Get(paramPath), Disk)
remotePath = newPath(parseUrl.Query().Get(paramRemotePath), fsType)

mode := parseUrl.Query().Get(paramMode)
if strings.ToLower(mode) == valueModeServer {
Expand Down
7 changes: 6 additions & 1 deletion driver/minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (c *minIODriver) Symlink(oldname, newname string) (err error) {
}

func (c *minIODriver) Remove(path string) (err error) {
path = c.trimPath(path)
return c.reconnectIfLost(func() error {
infoChan := c.client.ListObjects(c.ctx, c.bucketName, minio.ListObjectsOptions{
Recursive: true,
Expand Down Expand Up @@ -286,7 +287,7 @@ func (c *minIODriver) GetFileTime(path string) (cTime time.Time, aTime time.Time

func (c *minIODriver) WalkDir(root string, fn fs.WalkDirFunc) error {
return c.reconnectIfLost(func() error {
infoChan := c.client.ListObjects(c.ctx, c.bucketName, minio.ListObjectsOptions{Recursive: true})
infoChan := c.client.ListObjects(c.ctx, c.bucketName, minio.ListObjectsOptions{Recursive: true, Prefix: c.trimPath(root)})
for info := range infoChan {
if err := fn(info.Key, fs.FileInfoToDirEntry(newMinIOFileInfo(info)), info.Err); err != nil {
return err
Expand Down Expand Up @@ -348,3 +349,7 @@ func (c *minIODriver) fPutObject(ctx context.Context, bucketName, objectName, fi
}
return c.client.PutObject(ctx, bucketName, objectName, rate.NewReader(fileReader, c.maxTranRate, c.logger), fileSize, opts)
}

func (c *minIODriver) trimPath(path string) string {
return strings.TrimPrefix(path, "/")
}
2 changes: 2 additions & 0 deletions integration/integration_minio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ func TestIntegration_MinIO(t *testing.T) {
}{
{"gofs MinIO push", "", "run-gofs-minio-push-client.yaml", "test-gofs-minio-push.yaml"},
{"gofs MinIO pull", "", "run-gofs-minio-pull-client.yaml", "test-gofs-minio-pull.yaml"},
{"gofs MinIO push partial", "", "run-gofs-minio-push-client-partial.yaml", "test-gofs-minio-push-partial.yaml"},
{"gofs MinIO pull partial", "", "run-gofs-minio-pull-client-partial.yaml", "test-gofs-minio-pull-partial.yaml"},
}

for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source: minio://127.0.0.1:9000?secure=false&remote_path=minio-bucket:/partial-pull-1
dest: ./minio-pull-client-partial/dest
log_dir: ./minio-pull-client-partial-logs/
users: minio_user|minio_pwd
sync_cron: "*/5 * * * * *"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source: ./minio-push-client-partial/source
dest: minio://127.0.0.1:9000?secure=false&local_sync_disabled=false&path=./minio-push-client-partial/dest&remote_path=minio-bucket:/partial-push-1
log_dir: ./minio-push-client-partial-logs/
users: minio_user|minio_pwd
60 changes: 60 additions & 0 deletions integration/testdata/test/test-gofs-minio-pull-partial.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: test for gofs MinIO pull partial
init:
- mkdir:
source: ./minio-pull-client-partial/source
- mkdir:
source: ./minio-pull-client-partial/dest
actions:
- mkdir:
source: ./minio-data-mount/partial-pull-1
- mkdir:
source: ./minio-data-mount/partial-pull-2
- cp:
source: ./integration_minio_test.go
dest: ./minio-data-mount/partial-pull-1/integration_minio_test.go.pull.bak1
- cp:
source: ./integration_minio_test.go
dest: ./minio-data-mount/partial-pull-2/integration_minio_test.go.pull.bak1
- cp:
source: ./integration_minio_test.go
dest: ./minio-data-mount/partial-pull-1/integration_minio_test.go.pull.bak2
- rm:
source: ./minio-data-mount/partial-pull-1/integration_minio_test.go.pull.bak2
- touch:
source: ./minio-data-mount/partial-pull-1/hello.pull
- touch:
source: ./minio-data-mount/partial-pull-2/hello.pull
- mkdir:
source: ./minio-data-mount/partial-pull-1/work_pull
- mkdir:
source: ./minio-data-mount/partial-pull-2/work_pull
- sleep: 10s
- is-equal:
source: ./minio-data-mount/partial-pull-1/integration_minio_test.go.pull.bak1
dest: ./minio-pull-client-partial/dest/partial-pull-1/integration_minio_test.go.pull.bak1
expect: true
must-non-empty: true
- is-exist:
source: ./minio-pull-client-partial/dest/partial-pull-2/integration_minio_test.go.pull.bak1
expect: false
- is-exist:
source: ./minio-pull-client-partial/dest/partial-pull-1/integration_minio_test.go.pull.bak2
expect: false
- is-exist:
source: ./minio-pull-client-partial/dest/partial-pull-1/hello.pull
expect: true
- is-exist:
source: ./minio-pull-client-partial/dest/partial-pull-2/hello.pull
expect: false
- is-empty:
source: ./minio-pull-client-partial/dest/partial-pull-1/hello.pull
expect: true
- is-dir:
source: ./minio-pull-client-partial/dest/partial-pull-1/work_pull
expect: true
- is-exist:
source: ./minio-pull-client-partial/dest/partial-pull-2/work_pull
expect: false
clear:
- rm:
source: ./minio-pull-client-partial
111 changes: 111 additions & 0 deletions integration/testdata/test/test-gofs-minio-push-partial.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
name: test for gofs MinIO push partial
init:
- mkdir:
source: ./minio-push-client-partial/source
- mkdir:
source: ./minio-push-client-partial/dest
actions:
- cp:
source: ./integration_test.go
dest: ./minio-push-client-partial/source/integration_test.go.bak1
- cp:
source: ./integration_test.go
dest: ./minio-push-client-partial/source/integration_test.go.bak2
- sleep: 2s
- rm:
source: ./minio-push-client-partial/source/integration_test.go.bak2
- touch:
source: ./minio-push-client-partial/source/hello
- echo:
source: ./minio-push-client-partial/source/hello
input: Hello World
append: false
- echo:
source: ./minio-push-client-partial/source/hello
input: Bye Bye
append: true
no-newline: false
- echo:
source: ./minio-push-client-partial/source/empty1
no-newline: true
- echo:
source: ./minio-push-client-partial/source/empty2
no-newline: true
- symlink:
link: ./minio-push-client-partial/source/hello.symlink
dest: ./minio-push-client-partial/source/hello
ignore-error: false
- symlink:
link: ./minio-push-client-partial/source/hello.symlink.bak
dest: ./minio-push-client-partial/source/hello
ignore-error: false
- symlink:
link: ./minio-push-client-partial/source/not_exist.symlink
dest: ./minio-push-client-partial/source/not_exist
ignore-error: false
- sleep: 2s
- rm:
source: ./minio-push-client-partial/source/hello.symlink.bak
- sleep: 10s
- is-equal:
source: ./minio-push-client-partial/source/integration_test.go.bak1
dest: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: true
must-non-empty: true
- is-equal:
source: ./minio-push-client-partial/source/integration_test.go.bak1
dest: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: true
must-non-empty: true
algorithm: sha1
- is-equal:
source: ./minio-push-client-partial/source/integration_test.go.bak1
dest: ./minio-data-mount/partial-push-1/empty1
expect: false
- is-equal:
source: ./minio-push-client-partial/source/empty1
dest: ./minio-data-mount/partial-push-1/empty2
expect: true
must-non-empty: false
- is-empty:
source: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: false
- is-exist:
source: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: true
- is-exist:
source: ./minio-data-mount/partial-push-1/integration_test.go.bak2
expect: false
- is-dir:
source: ./minio-data-mount
expect: true
- is-dir:
source: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: false
- hash:
algorithm: md5
source: ./minio-data-mount/partial-push-1/hello
expect: 856719c57653fa86008d49db895a5752
- hash:
algorithm: sha1
source: ./minio-data-mount/partial-push-1/hello
expect: f343874b5df87e887d85df2e790df33584463162
- rm:
source: ./minio-push-client-partial/source/integration_test.go.bak1
- sleep: 10s
- run: ../scripts/minio/remount-minio.sh
- is-exist:
source: ./minio-data-mount/partial-push-1/integration_test.go.bak1
expect: false
- is-empty:
source: ./minio-data-mount/partial-push-1/hello.symlink
expect: false
- is-empty:
source: ./minio-data-mount/partial-push-1/not_exist.symlink
expect: false
- is-exist:
source: ./minio-data-mount/partial-push-1/hello.symlink.bak
expect: false
clear:
- rm:
source: ./minio-push-client-partial
6 changes: 1 addition & 5 deletions monitor/driver_pull_client_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,8 @@ func (m *driverPullClientMonitor) waitShutdown(st *atomic.Bool, wd wait.Done) {

// sync try to sync all the files once
func (m *driverPullClientMonitor) sync() (err error) {
// source.Path() and source.RemotePath() are equivalent here, and source.RemotePath() has higher priority
source := m.syncer.Source()
path := source.RemotePath()
if len(path) == 0 {
path = source.Path()
}
path := source.RemotePath().Base()
return m.syncer.SyncOnce(path)
}

Expand Down
6 changes: 3 additions & 3 deletions monitor/fsnotify_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,20 @@ func (m *fsNotifyMonitor) Start() (wait.Wait, error) {
// execute -sync_once flag
if m.syncOnce {
wd.Done()
return wd, m.syncer.SyncOnce(source.Path())
return wd, m.syncer.SyncOnce(source.Path().Base())
}

// execute -sync_cron flag
if err := m.startCron(func() error {
return m.syncer.SyncOnce(source.Path())
return m.syncer.SyncOnce(source.Path().Base())
}); err != nil {
return nil, err
}

if !source.IsDisk() && !source.Is(core.RemoteDisk) {
return nil, errors.New("the source must be a disk or remote disk")
}
if err := m.monitor(source.Path()); err != nil {
if err := m.monitor(source.Path().Base()); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 5637ae9

Please sign in to comment.