Skip to content

Commit

Permalink
fix: tiny piece digest reader (#3164)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Apr 2, 2024
1 parent d18625b commit 8c9ff38
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 9 deletions.
8 changes: 7 additions & 1 deletion client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,12 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.cancel(commonv1.Code_ClientError, err.Error())
return
}
reader, err := digest.NewReader(digest.AlgorithmMD5, bytes.NewBuffer(pt.tinyData.Content))
if err != nil {
pt.Errorf("create digest reader: %s", err)
pt.cancel(commonv1.Code_ClientError, err.Error())
return
}
n, err := pt.GetStorage().WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
Expand All @@ -634,7 +640,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
Style: 0,
},
UnknownLength: false,
Reader: bytes.NewBuffer(pt.tinyData.Content),
Reader: reader,
NeedGenMetadata: func(n int64) (int32, int64, bool) {
return 1, contentLength, true
},
Expand Down
43 changes: 35 additions & 8 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
}

type mockManager struct {
testSpec *testSpec
peerTaskManager *peerTaskManager
schedulerClient schedulerclient.V1
storageManager storage.Manager
testSpec *testSpec
peerTaskManager *peerTaskManager
schedulerClient schedulerclient.V1
storageManager storage.Manager
componentsOption componentsOption
}

func (m *mockManager) CleanUp() {
Expand Down Expand Up @@ -320,10 +321,11 @@ func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOptio
},
}
return &mockManager{
testSpec: ts,
peerTaskManager: ptm,
schedulerClient: schedulerClient,
storageManager: storageManager,
testSpec: ts,
peerTaskManager: ptm,
schedulerClient: schedulerClient,
storageManager: storageManager,
componentsOption: opt,
}
}

Expand Down Expand Up @@ -733,6 +735,25 @@ func (ts *testSpec) runFileTaskTest(assert *testifyassert.Assertions, require *t
outputBytes, err := os.ReadFile(output)
require.Nil(err, "load output file")
require.Equal(ts.taskData, outputBytes, "output and desired output must match")

ts.checkPieceMd5(require, mm)
}

func (ts *testSpec) checkPieceMd5(require *testifyrequire.Assertions, mm *mockManager) {
pieces, err := mm.storageManager.GetPieces(context.Background(), &commonv1.PieceTaskRequest{
TaskId: mm.componentsOption.taskID,
SrcPid: "",
DstPid: ts.peerID,
StartNum: 0,
Limit: 10000,
})
require.Nil(err, "get all pieces")
if len(ts.taskData) > 0 {
require.Greater(len(pieces.PieceInfos), 0)
for _, piece := range pieces.PieceInfos {
require.NotEmpty(piece.PieceMd5, "piece md5 should not be empty")
}
}
}

func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
Expand All @@ -748,6 +769,8 @@ func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *test
outputBytes, err := io.ReadAll(r)
require.Nil(err, "load read data")
require.Equal(ts.taskData, outputBytes, "output and desired output must match")

ts.checkPieceMd5(require, mm)
}

func (ts *testSpec) runSeedTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
Expand Down Expand Up @@ -792,6 +815,8 @@ loop:
}

require.True(success, "seed task should success")

ts.checkPieceMd5(require, mm)
}

func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
Expand Down Expand Up @@ -955,4 +980,6 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
outputBytes, err := os.ReadFile(output)
assert.Nil(err, "load output file should be ok")
assert.Equal(ts.taskData, outputBytes, "file output and desired output must match")

ts.checkPieceMd5(require, mm)
}

0 comments on commit 8c9ff38

Please sign in to comment.