-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Fix download and upload large file bugs #321
Changes from 7 commits
f1d18d1
7e89a03
5873476
f818096
060b327
52e0f5d
14f3ca5
a8c6129
da76370
cb55ca5
dca2e54
0ed330d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,23 +3,24 @@ package pfsmodules | |
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"os" | ||
"strconv" | ||
|
||
log "github.com/golang/glog" | ||
) | ||
|
||
// Chunk respresents a chunk info. | ||
type Chunk struct { | ||
// ChunkParam respresents a chunk info. | ||
type ChunkParam struct { | ||
Path string | ||
Offset int64 | ||
Size int64 | ||
} | ||
|
||
// ToString packs info of ChunkParam. | ||
func (p *ChunkParam) ToString() string { | ||
return fmt.Sprintf("Path:%s Offset:%d Size:%d", p.Path, p.Offset, p.Size) | ||
} | ||
|
||
// ToURLParam encodes variables to url encoding parameters. | ||
func (p *Chunk) ToURLParam() url.Values { | ||
func (p *ChunkParam) ToURLParam() url.Values { | ||
parameters := url.Values{} | ||
parameters.Add("path", p.Path) | ||
|
||
|
@@ -32,11 +33,11 @@ func (p *Chunk) ToURLParam() url.Values { | |
return parameters | ||
} | ||
|
||
// ParseChunk get a Chunk struct from path. | ||
// ParseChunkParam get a Chunk struct from path. | ||
// path example: | ||
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096 | ||
func ParseChunk(path string) (*Chunk, error) { | ||
cmd := Chunk{} | ||
func ParseChunkParam(path string) (*ChunkParam, error) { | ||
cmd := ChunkParam{} | ||
|
||
m, err := url.ParseQuery(path) | ||
if err != nil || | ||
|
@@ -61,38 +62,22 @@ func ParseChunk(path string) (*Chunk, error) { | |
return &cmd, nil | ||
} | ||
|
||
// LoadChunkData loads a specified chunk to io.Writer. | ||
func (p *Chunk) LoadChunkData(w io.Writer) error { | ||
f, err := os.Open(p.Path) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
loaded, err := io.CopyN(w, f, p.Size) | ||
log.V(2).Infof("loaded:%d\n", loaded) | ||
return err | ||
// Chunk is struct. | ||
type Chunk struct { | ||
Offset int64 | ||
Len int64 | ||
Checksum string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 有道理。这样是否会更好一些:
读数据的时候还是需要这个结构体的。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 赞! |
||
Data []byte | ||
} | ||
|
||
// SaveChunkData save data from io.Reader. | ||
func (p *Chunk) SaveChunkData(r io.Reader) error { | ||
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
// ToString packs info of Chunk | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ToString -> String There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done。 |
||
func (c *Chunk) ToString() string { | ||
return fmt.Sprintf("Offset:%d Len:%d Checksum:%s", c.Offset, c.Len, c.Checksum) | ||
} | ||
|
||
writen, err := io.CopyN(f, r, p.Size) | ||
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen) | ||
return err | ||
// NewChunk make a Chunk struct. | ||
func NewChunk(capcity int64) *Chunk { | ||
c := Chunk{} | ||
c.Data = make([]byte, capcity) | ||
return &c | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,15 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"crypto/md5" | ||
"encoding/hex" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"sort" | ||
"strconv" | ||
|
||
"github.com/PaddlePaddle/cloud/go/utils/restclient" | ||
) | ||
|
||
const ( | ||
|
@@ -30,10 +28,16 @@ type ChunkMeta struct { | |
Len int64 `json:"len"` | ||
} | ||
|
||
// ToString pack a info tring of ChunkMeta. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ToString -> String There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 惭愧。Done。 |
||
func (m *ChunkMeta) ToString() string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a requirement, but could consider using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 应该改名字为String()。Done。 |
||
return fmt.Sprintf("Offset:%d Checksum:%s Len:%d", m.Offset, m.Checksum, m.Len) | ||
} | ||
|
||
// ChunkMetaCmd is a command. | ||
type ChunkMetaCmd struct { | ||
Method string `json:"method"` | ||
FilePath string `json:"path"` | ||
Offset int64 `json:"offset"` | ||
ChunkSize int64 `json:"chunksize"` | ||
} | ||
|
||
|
@@ -46,6 +50,9 @@ func (p *ChunkMetaCmd) ToURLParam() url.Values { | |
str := fmt.Sprint(p.ChunkSize) | ||
parameters.Add("chunksize", str) | ||
|
||
str = fmt.Sprint(p.Offset) | ||
parameters.Add("offset", str) | ||
|
||
return parameters | ||
} | ||
|
||
|
@@ -56,7 +63,23 @@ func (p *ChunkMetaCmd) ToJSON() ([]byte, error) { | |
|
||
// Run is a functions which run ChunkMetaCmd. | ||
func (p *ChunkMetaCmd) Run() (interface{}, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CLI中确实没有此命令。当时考虑到他实现了Command Interface所以为了和ChunkMeta区别命名成了这样。 |
||
return GetChunkMeta(p.FilePath, p.ChunkSize) | ||
f := FileHandle{} | ||
if err := f.Open(p.FilePath, os.O_RDONLY, 0); err != nil { | ||
return nil, err | ||
} | ||
|
||
defer f.Close() | ||
|
||
c, err := f.ReadChunk(p.Offset, p.ChunkSize) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里并没有read chunk(读chunk的内容),只是读了meta,考虑改成LoadMeta? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 在FileHandle中加了接口。Done。 |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &ChunkMeta{ | ||
Offset: p.Offset, | ||
Checksum: c.Checksum, | ||
Len: c.Len, | ||
}, nil | ||
} | ||
|
||
func (p *ChunkMetaCmd) checkChunkSize() error { | ||
|
@@ -87,11 +110,13 @@ func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { | |
method := r.URL.Query().Get("method") | ||
path := r.URL.Query().Get("path") | ||
chunkStr := r.URL.Query().Get("chunksize") | ||
offsetStr := r.URL.Query().Get("offset") | ||
|
||
if len(method) == 0 || | ||
method != ChunkMetaCmdName || | ||
len(path) == 0 || | ||
len(chunkStr) == 0 { | ||
len(chunkStr) == 0 || | ||
len(offsetStr) == 0 { | ||
return nil, errors.New(http.StatusText(http.StatusBadRequest)) | ||
} | ||
|
||
|
@@ -100,105 +125,46 @@ func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { | |
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
offset, err := strconv.ParseInt(offsetStr, 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
return &ChunkMetaCmd{ | ||
Method: method, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
Offset: offset, | ||
}, nil | ||
} | ||
|
||
type metaSlice []ChunkMeta | ||
|
||
func (a metaSlice) Len() int { return len(a) } | ||
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | ||
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset } | ||
|
||
// GetDiffChunkMeta gets difference between srcMeta and dstMeta. | ||
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) { | ||
if len(dstMeta) == 0 || len(srcMeta) == 0 { | ||
return srcMeta, nil | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(srcMeta)) { | ||
sort.Sort(metaSlice(srcMeta)) | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(dstMeta)) { | ||
sort.Sort(metaSlice(dstMeta)) | ||
} | ||
|
||
dstIdx := 0 | ||
srcIdx := 0 | ||
diff := make([]ChunkMeta, 0, len(srcMeta)) | ||
|
||
for { | ||
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
srcIdx++ | ||
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset { | ||
dstIdx++ | ||
} else { | ||
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
} | ||
|
||
dstIdx++ | ||
srcIdx++ | ||
} | ||
|
||
if dstIdx >= len(dstMeta) { | ||
break | ||
} | ||
|
||
if srcIdx >= len(srcMeta) { | ||
break | ||
} | ||
} | ||
|
||
if srcIdx < len(srcMeta) { | ||
diff = append(diff, srcMeta[srcIdx:]...) | ||
func remoteChunkMeta(path string, offset int64, | ||
chunkSize int64) (*ChunkMeta, error) { | ||
cmd := ChunkMetaCmd{ | ||
Method: ChunkMetaCmdName, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
Offset: offset, | ||
} | ||
|
||
return diff, nil | ||
} | ||
|
||
// GetChunkMeta gets chunk metas from path of file. | ||
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) { | ||
f, err := os.Open(path) | ||
t := fmt.Sprintf("%s/api/v1/pfs/chunks", Config.ActiveConfig.Endpoint) | ||
ret, err := restclient.GetCall(t, cmd.ToURLParam()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer Close(f) | ||
|
||
if len > defaultMaxChunkSize || len < defaultMinChunkSize { | ||
return nil, errors.New(StatusBadChunkSize) | ||
type chunkMetaResponse struct { | ||
Err string `json:"err"` | ||
Results ChunkMeta `json:"results"` | ||
} | ||
|
||
var metas []ChunkMeta | ||
|
||
data := make([]byte, len) | ||
offset := int64(0) | ||
|
||
for { | ||
n, err := f.Read(data) | ||
if err != nil && err != io.EOF { | ||
return nil, err | ||
} | ||
|
||
if err == io.EOF { | ||
break | ||
} | ||
|
||
m := ChunkMeta{} | ||
m.Offset = offset | ||
sum := md5.Sum(data[:n]) | ||
m.Checksum = hex.EncodeToString(sum[:]) | ||
m.Len = int64(n) | ||
|
||
metas = append(metas, m) | ||
resp := chunkMetaResponse{} | ||
if err := json.Unmarshal(ret, &resp); err != nil { | ||
return nil, err | ||
} | ||
|
||
offset += int64(n) | ||
if len(resp.Err) == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 赞。Done。 |
||
return &resp.Results, nil | ||
} | ||
|
||
return metas, nil | ||
return &resp.Results, errors.New(resp.Err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,62 +1 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"crypto/md5" | ||
"fmt" | ||
"testing" | ||
) | ||
|
||
func TestGetDiffMeta(t *testing.T) { | ||
var src []ChunkMeta | ||
var dst []ChunkMeta | ||
|
||
var data []ChunkMeta | ||
|
||
for i := 0; i < 4; i++ { | ||
data = append(data, ChunkMeta{ | ||
Offset: int64(i), | ||
Checksum: fmt.Sprintf("%x", md5.Sum([]byte("1"))), | ||
Len: 1}) | ||
} | ||
|
||
src = data[0:3] | ||
// check when len(dst) is 0 | ||
diff, _ := GetDiffChunkMeta(src, dst) | ||
if len(diff) != 3 { | ||
t.Error(len(diff)) | ||
} | ||
|
||
for i := range diff { | ||
if diff[i] != src[i] { | ||
t.Error(i) | ||
} | ||
} | ||
|
||
// check when dst is same as src | ||
dst = src | ||
diff, _ = GetDiffChunkMeta(src, dst) | ||
if len(diff) != 0 { | ||
t.Error(len(diff)) | ||
} | ||
|
||
// check when dst is small than src | ||
dst = dst[:0] | ||
dst = append(dst, data[0]) | ||
dst = append(dst, data[2]) | ||
|
||
diff, _ = GetDiffChunkMeta(src, dst) | ||
if len(diff) != 1 { | ||
t.Error(len(diff)) | ||
} | ||
|
||
if diff[0] != data[1] { | ||
t.Error(0) | ||
} | ||
|
||
// check when dst is large then src | ||
dst = data | ||
diff, _ = GetDiffChunkMeta(src, dst) | ||
if len(diff) != 0 { | ||
t.Error(len(diff)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a requirement, but it seems a lot of effort to write
ToURLParam
andParseChunkParam
manually for each type that we want to pass over the wire. If we use json, the json encoder / decoder can do these automatically for us, and always correctly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是这样的,REST API中get方法标准的做法是body中没有任何的内容,也就是通过URL本身即可完成访问。而URL中大家用的最多的还是URL Parameters,好像很少把JSON串Encode到URL Parameters中。
不知道我的这个理解是否正确?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
谢谢!明白了,确实需要转成URL parameters。可以考虑写一个类似于json.Marshal和json.Unmarshal的函数。通过reflection自动encode / decode一个结构体。我搜了一下没找到开源的,自己写应该不难。这个repo有些相关,但其好像多做了很多东西,可以作为参考:/~https://github.com/tonto/qparams
不是这个PR的merge blocker哈。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
恩。这个一直想做,后来主线业务多把这个给耽误了。
不过现在想想,没有类似JSON.Unmarshal的好的开源,我要是搞出来会不会增加一下我们PaddlePadle的Star的数目?
有点意思,周末先搞这个一下。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gongweibao 赞,如果想让其他人用上,放成一个小repo比较方便(写到paddle里面别人要用的话依赖太大了),这个很独立,可以考虑自己建一个repo开源这个项目:)