Skip to content

Commit

Permalink
chore: update single piece storage (#3186)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Apr 11, 2024
1 parent 5cee901 commit 35e8bac
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
53 changes: 36 additions & 17 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,31 @@ func (pt *peerTaskConductor) pullSinglePiece() {
pt.SetTotalPieces(1)
pt.SetPieceMd5Sign(digest.SHA256FromStrings(pt.singlePiece.PieceInfo.PieceMd5))

request := &DownloadPieceRequest{
var (
err error
request *DownloadPieceRequest
result *DownloadPieceResult
)

// fallback to download from other peers with p2p mode
fallback := func() {
span.RecordError(err)
span.SetAttributes(config.AttributePieceSuccess.Bool(false))
span.End()

pt.Warnf("single piece download failed, switch to download from other peers")
pt.ReportPieceResult(request, result, err)

pt.pullPiecesWithP2P()
}

err = pt.UpdateStorage()
if err != nil {
fallback()
return
}

request = &DownloadPieceRequest{
storage: pt.GetStorage(),
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
Expand All @@ -920,24 +944,19 @@ func (pt *peerTaskConductor) pullSinglePiece() {
DstAddr: pt.singlePiece.DstAddr,
}

if result, err := pt.PieceManager.DownloadPiece(ctx, request); err == nil {
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
pt.Infof("single piece download success")
} else {
// fallback to download from other peers
span.RecordError(err)
span.SetAttributes(config.AttributePieceSuccess.Bool(false))
span.End()
result, err = pt.PieceManager.DownloadPiece(ctx, request)
if err != nil {
fallback()
return
}

pt.Warnf("single piece download failed, switch to download from other peers")
pt.ReportPieceResult(request, result, err)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)

pt.pullPiecesWithP2P()
}
span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
pt.Infof("single piece download success")
return
}

func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func (s *server) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskR
}, nil
}

logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece start num: %d, limit: %d, count: %d, total content length: %d",
request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos), p.ContentLength)
logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece start num: %d, limit: %d, count: %d, total piece: %s, total content length: %d",
request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos), p.TotalPiece, p.ContentLength)
p.DstAddr = s.uploadAddr
return p, nil
}
Expand Down

0 comments on commit 35e8bac

Please sign in to comment.