From 35e8bac343708e72a06ea1672df57dc0a3a5ffb5 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 11 Apr 2024 15:17:12 +0800 Subject: [PATCH] chore: update single piece storage (#3186) Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 53 ++++++++++++++++-------- client/daemon/rpcserver/rpcserver.go | 4 +- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 126dc963090..49009e6cb7f 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -910,7 +910,31 @@ func (pt *peerTaskConductor) pullSinglePiece() { pt.SetTotalPieces(1) pt.SetPieceMd5Sign(digest.SHA256FromStrings(pt.singlePiece.PieceInfo.PieceMd5)) - request := &DownloadPieceRequest{ + var ( + err error + request *DownloadPieceRequest + result *DownloadPieceResult + ) + + // fallback to download from other peers with p2p mode + fallback := func() { + span.RecordError(err) + span.SetAttributes(config.AttributePieceSuccess.Bool(false)) + span.End() + + pt.Warnf("single piece download failed, switch to download from other peers") + pt.ReportPieceResult(request, result, err) + + pt.pullPiecesWithP2P() + } + + err = pt.UpdateStorage() + if err != nil { + fallback() + return + } + + request = &DownloadPieceRequest{ storage: pt.GetStorage(), piece: pt.singlePiece.PieceInfo, log: pt.Log(), @@ -920,24 +944,19 @@ func (pt *peerTaskConductor) pullSinglePiece() { DstAddr: pt.singlePiece.DstAddr, } - if result, err := pt.PieceManager.DownloadPiece(ctx, request); err == nil { - pt.reportSuccessResult(request, result) - pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) - - span.SetAttributes(config.AttributePieceSuccess.Bool(true)) - span.End() - pt.Infof("single piece download success") - } else { - // fallback to download from other peers - span.RecordError(err) - span.SetAttributes(config.AttributePieceSuccess.Bool(false)) - span.End() + result, err = pt.PieceManager.DownloadPiece(ctx, request) + if err != nil { + fallback() + return + } - pt.Warnf("single piece download failed, switch to download from other peers") - pt.ReportPieceResult(request, result, err) + pt.reportSuccessResult(request, result) + pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) - pt.pullPiecesWithP2P() - } + span.SetAttributes(config.AttributePieceSuccess.Bool(true)) + span.End() + pt.Infof("single piece download success") + return } func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index d64b506d1db..f37ce88a49b 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -224,8 +224,8 @@ func (s *server) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskR }, nil } - logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece start num: %d, limit: %d, count: %d, total content length: %d", - request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos), p.ContentLength) + logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece start num: %d, limit: %d, count: %d, total piece: %s, total content length: %d", + request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos), p.TotalPiece, p.ContentLength) p.DstAddr = s.uploadAddr return p, nil }