Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix download and upload large file bugs #321

Merged
merged 12 commits into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 26 additions & 41 deletions go/filemanager/pfsmodules/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ package pfsmodules
import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strconv"

log "github.com/golang/glog"
)

// Chunk respresents a chunk info.
type Chunk struct {
// ChunkParam respresents a chunk info.
type ChunkParam struct {
Path string
Offset int64
Size int64
}

// ToString packs info of ChunkParam.
func (p *ChunkParam) ToString() string {
return fmt.Sprintf("Path:%s Offset:%d Size:%d", p.Path, p.Offset, p.Size)
}

// ToURLParam encodes variables to url encoding parameters.
func (p *Chunk) ToURLParam() url.Values {
func (p *ChunkParam) ToURLParam() url.Values {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a requirement, but it seems a lot of effort to write ToURLParam and ParseChunkParam manually for each type that we want to pass over the wire. If we use json, the json encoder / decoder can do these automatically for us, and always correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是这样的,REST API中get方法标准的做法是body中没有任何的内容,也就是通过URL本身即可完成访问。而URL中大家用的最多的还是URL Parameters,好像很少把JSON串Encode到URL Parameters中。
不知道我的这个理解是否正确?

Copy link
Collaborator

@helinwang helinwang Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

谢谢!明白了,确实需要转成URL parameters。可以考虑写一个类似于json.Marshal和json.Unmarshal的函数。通过reflection自动encode / decode一个结构体。我搜了一下没找到开源的,自己写应该不难。这个repo有些相关,但其好像多做了很多东西,可以作为参考:/~https://github.com/tonto/qparams
不是这个PR的merge blocker哈。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩。这个一直想做,后来主线业务多把这个给耽误了。
不过现在想想,没有类似JSON.Unmarshal的好的开源,我要是搞出来会不会增加一下我们PaddlePadle的Star的数目?
有点意思,周末先搞这个一下。

Copy link
Collaborator

@helinwang helinwang Aug 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao 赞,如果想让其他人用上,放成一个小repo比较方便(写到paddle里面别人要用的话依赖太大了),这个很独立,可以考虑自己建一个repo开源这个项目:)

parameters := url.Values{}
parameters.Add("path", p.Path)

Expand All @@ -32,11 +33,11 @@ func (p *Chunk) ToURLParam() url.Values {
return parameters
}

// ParseChunk get a Chunk struct from path.
// ParseChunkParam get a Chunk struct from path.
// path example:
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096
func ParseChunk(path string) (*Chunk, error) {
cmd := Chunk{}
func ParseChunkParam(path string) (*ChunkParam, error) {
cmd := ChunkParam{}

m, err := url.ParseQuery(path)
if err != nil ||
Expand All @@ -61,38 +62,22 @@ func ParseChunk(path string) (*Chunk, error) {
return &cmd, nil
}

// LoadChunkData loads a specified chunk to io.Writer.
func (p *Chunk) LoadChunkData(w io.Writer) error {
f, err := os.Open(p.Path)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}

loaded, err := io.CopyN(w, f, p.Size)
log.V(2).Infof("loaded:%d\n", loaded)
return err
// Chunk is struct.
type Chunk struct {
Offset int64
Len int64
Checksum string
Copy link
Collaborator

@helinwang helinwang Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkMeta have Offset, Len, Checksum, do we still need them here in Chunk?
Given only []byte is left if we remove them , maybe instead of defining a new type, we can just use []byte to represent Chunk?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有道理。这样是否会更好一些:

type Chunk struct {
Meta ChunkMeta
data []byte
}

读数据的时候还是需要这个结构体的。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

赞!

Data []byte
}

// SaveChunkData save data from io.Reader.
func (p *Chunk) SaveChunkData(r io.Reader) error {
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}
// ToString packs info of Chunk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString -> String

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done。

func (c *Chunk) ToString() string {
return fmt.Sprintf("Offset:%d Len:%d Checksum:%s", c.Offset, c.Len, c.Checksum)
}

writen, err := io.CopyN(f, r, p.Size)
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen)
return err
// NewChunk make a Chunk struct.
func NewChunk(capcity int64) *Chunk {
c := Chunk{}
c.Data = make([]byte, capcity)
return &c
}
146 changes: 56 additions & 90 deletions go/filemanager/pfsmodules/chunkmeta.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package pfsmodules

import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sort"
"strconv"

"github.com/PaddlePaddle/cloud/go/utils/restclient"
)

const (
Expand All @@ -30,10 +28,16 @@ type ChunkMeta struct {
Len int64 `json:"len"`
}

// ToString pack a info tring of ChunkMeta.
Copy link
Collaborator

@helinwang helinwang Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString -> String
可以考虑搜索以下代码中的ToString字符串,需要的都可以String。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

惭愧。Done。

func (m *ChunkMeta) ToString() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a requirement, but could consider using the Stringer interface here: https://tour.golang.org/methods/17 .

Copy link
Collaborator Author

@gongweibao gongweibao Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该改名字为String()。Done。

return fmt.Sprintf("Offset:%d Checksum:%s Len:%d", m.Offset, m.Checksum, m.Len)
}

// ChunkMetaCmd is a command.
type ChunkMetaCmd struct {
Method string `json:"method"`
FilePath string `json:"path"`
Offset int64 `json:"offset"`
ChunkSize int64 `json:"chunksize"`
}

Expand All @@ -46,6 +50,9 @@ func (p *ChunkMetaCmd) ToURLParam() url.Values {
str := fmt.Sprint(p.ChunkSize)
parameters.Add("chunksize", str)

str = fmt.Sprint(p.Offset)
parameters.Add("offset", str)

return parameters
}

Expand All @@ -56,7 +63,23 @@ func (p *ChunkMetaCmd) ToJSON() ([]byte, error) {

// Run is a functions which run ChunkMetaCmd.
func (p *ChunkMetaCmd) Run() (interface{}, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is ChunkMetaCmd a command that we can run from the CLI?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLI中确实没有此命令。当时考虑到他实现了Command Interface所以为了和ChunkMeta区别命名成了这样。
其实主要是用来做ChunkMeta的网络通信的,各个字段和ChunkMeta也不一样。
@helinwang 有啥好的命名建议没有?

return GetChunkMeta(p.FilePath, p.ChunkSize)
f := FileHandle{}
if err := f.Open(p.FilePath, os.O_RDONLY, 0); err != nil {
return nil, err
}

defer f.Close()

c, err := f.ReadChunk(p.Offset, p.ChunkSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里并没有read chunk(读chunk的内容),只是读了meta,考虑改成LoadMeta?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在FileHandle中加了接口。Done。

if err != nil {
return nil, err
}

return &ChunkMeta{
Offset: p.Offset,
Checksum: c.Checksum,
Len: c.Len,
}, nil
}

func (p *ChunkMetaCmd) checkChunkSize() error {
Expand Down Expand Up @@ -87,11 +110,13 @@ func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) {
method := r.URL.Query().Get("method")
path := r.URL.Query().Get("path")
chunkStr := r.URL.Query().Get("chunksize")
offsetStr := r.URL.Query().Get("offset")

if len(method) == 0 ||
method != ChunkMetaCmdName ||
len(path) == 0 ||
len(chunkStr) == 0 {
len(chunkStr) == 0 ||
len(offsetStr) == 0 {
return nil, errors.New(http.StatusText(http.StatusBadRequest))
}

Expand All @@ -100,105 +125,46 @@ func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) {
return nil, errors.New(StatusBadChunkSize)
}

offset, err := strconv.ParseInt(offsetStr, 10, 64)
if err != nil {
return nil, errors.New(StatusBadChunkSize)
}

return &ChunkMetaCmd{
Method: method,
FilePath: path,
ChunkSize: chunkSize,
Offset: offset,
}, nil
}

type metaSlice []ChunkMeta

func (a metaSlice) Len() int { return len(a) }
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset }

// GetDiffChunkMeta gets difference between srcMeta and dstMeta.
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) {
if len(dstMeta) == 0 || len(srcMeta) == 0 {
return srcMeta, nil
}

if !sort.IsSorted(metaSlice(srcMeta)) {
sort.Sort(metaSlice(srcMeta))
}

if !sort.IsSorted(metaSlice(dstMeta)) {
sort.Sort(metaSlice(dstMeta))
}

dstIdx := 0
srcIdx := 0
diff := make([]ChunkMeta, 0, len(srcMeta))

for {
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset {
diff = append(diff, srcMeta[srcIdx])
srcIdx++
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset {
dstIdx++
} else {
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum {
diff = append(diff, srcMeta[srcIdx])
}

dstIdx++
srcIdx++
}

if dstIdx >= len(dstMeta) {
break
}

if srcIdx >= len(srcMeta) {
break
}
}

if srcIdx < len(srcMeta) {
diff = append(diff, srcMeta[srcIdx:]...)
func remoteChunkMeta(path string, offset int64,
chunkSize int64) (*ChunkMeta, error) {
cmd := ChunkMetaCmd{
Method: ChunkMetaCmdName,
FilePath: path,
ChunkSize: chunkSize,
Offset: offset,
}

return diff, nil
}

// GetChunkMeta gets chunk metas from path of file.
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) {
f, err := os.Open(path)
t := fmt.Sprintf("%s/api/v1/pfs/chunks", Config.ActiveConfig.Endpoint)
ret, err := restclient.GetCall(t, cmd.ToURLParam())
if err != nil {
return nil, err
}
defer Close(f)

if len > defaultMaxChunkSize || len < defaultMinChunkSize {
return nil, errors.New(StatusBadChunkSize)
type chunkMetaResponse struct {
Err string `json:"err"`
Results ChunkMeta `json:"results"`
}

var metas []ChunkMeta

data := make([]byte, len)
offset := int64(0)

for {
n, err := f.Read(data)
if err != nil && err != io.EOF {
return nil, err
}

if err == io.EOF {
break
}

m := ChunkMeta{}
m.Offset = offset
sum := md5.Sum(data[:n])
m.Checksum = hex.EncodeToString(sum[:])
m.Len = int64(n)

metas = append(metas, m)
resp := chunkMetaResponse{}
if err := json.Unmarshal(ret, &resp); err != nil {
return nil, err
}

offset += int64(n)
if len(resp.Err) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if resp.Err == "" {更容易理解。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

赞。Done。

return &resp.Results, nil
}

return metas, nil
return &resp.Results, errors.New(resp.Err)
}
61 changes: 0 additions & 61 deletions go/filemanager/pfsmodules/chunkmeta_test.go
Original file line number Diff line number Diff line change
@@ -1,62 +1 @@
package pfsmodules

import (
"crypto/md5"
"fmt"
"testing"
)

func TestGetDiffMeta(t *testing.T) {
var src []ChunkMeta
var dst []ChunkMeta

var data []ChunkMeta

for i := 0; i < 4; i++ {
data = append(data, ChunkMeta{
Offset: int64(i),
Checksum: fmt.Sprintf("%x", md5.Sum([]byte("1"))),
Len: 1})
}

src = data[0:3]
// check when len(dst) is 0
diff, _ := GetDiffChunkMeta(src, dst)
if len(diff) != 3 {
t.Error(len(diff))
}

for i := range diff {
if diff[i] != src[i] {
t.Error(i)
}
}

// check when dst is same as src
dst = src
diff, _ = GetDiffChunkMeta(src, dst)
if len(diff) != 0 {
t.Error(len(diff))
}

// check when dst is small than src
dst = dst[:0]
dst = append(dst, data[0])
dst = append(dst, data[2])

diff, _ = GetDiffChunkMeta(src, dst)
if len(diff) != 1 {
t.Error(len(diff))
}

if diff[0] != data[1] {
t.Error(0)
}

// check when dst is large then src
dst = data
diff, _ = GetDiffChunkMeta(src, dst)
if len(diff) != 0 {
t.Error(len(diff))
}
}
Loading