diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index f668db86ff9..8a87e128aa7 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -47,6 +47,7 @@ const ( reasonReScheduleTimeout = "wait more available peers from scheduler timeout" reasonContextCanceled = "context canceled" reasonPeerGoneFromScheduler = "scheduler says client should disconnect" + reasonBackSourceDisabled = "download from source disabled" failedReasonNotSet = "unknown" failedCodeNotSet = 0 diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index c42daedba86..baf1dcd8f52 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -37,7 +37,11 @@ import ( type FilePeerTaskRequest struct { scheduler.PeerTaskRequest - Output string + Output string + Limit float64 + DisableBackSource bool + Pattern string + Callsystem string } // FilePeerTask represents a peer task to download a file @@ -52,6 +56,11 @@ type filePeerTask struct { // progressCh holds progress status progressCh chan *FilePeerTaskProgress progressStopCh chan bool + + // disableBackSource indicates not back source when failed + disableBackSource bool + pattern string + callsystem string } var _ FilePeerTask = (*filePeerTask)(nil) @@ -75,7 +84,7 @@ type FilePeerTaskProgress struct { func newFilePeerTask(ctx context.Context, host *scheduler.PeerHost, pieceManager PieceManager, - request *scheduler.PeerTaskRequest, + request *FilePeerTaskRequest, schedulerClient schedulerclient.SchedulerClient, schedulerOption config.SchedulerOption, perPeerRateLimit rate.Limit) (context.Context, *filePeerTask, *TinyData, error) { @@ -89,7 +98,7 @@ func newFilePeerTask(ctx context.Context, // trace register regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask) logger.Infof("step 1: peer %s start to register", request.PeerId) - result, err := schedulerClient.RegisterPeerTask(regCtx, request) + result, err := schedulerClient.RegisterPeerTask(regCtx, &request.PeerTaskRequest) regSpan.RecordError(err) regSpan.End() @@ -149,7 +158,7 @@ func newFilePeerTask(ctx context.Context, } } logger.Infof("step 2: start report peer %s piece result", request.PeerId) - peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) + peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, &request.PeerTaskRequest) if err != nil { logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err) defer span.End() @@ -162,12 +171,15 @@ func newFilePeerTask(ctx context.Context, limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit)) } pt := &filePeerTask{ - progressCh: make(chan *FilePeerTaskProgress), - progressStopCh: make(chan bool), + progressCh: make(chan *FilePeerTaskProgress), + progressStopCh: make(chan bool), + disableBackSource: request.DisableBackSource, + pattern: request.Pattern, + callsystem: request.Callsystem, peerTask: peerTask{ host: host, needBackSource: needBackSource, - request: request, + request: &request.PeerTaskRequest, peerPacketStream: peerPacketStream, pieceManager: pieceManager, peerPacketReady: make(chan bool, 1), @@ -416,6 +428,11 @@ func (pt *filePeerTask) SetContentLength(i int64) error { func (pt *filePeerTask) backSource() { defer pt.cleanUnfinished() + if pt.disableBackSource { + pt.Errorf(reasonBackSourceDisabled) + pt.failedReason = reasonBackSourceDisabled + return + } err := pt.pieceManager.DownloadSource(pt.ctx, pt, pt.request) if err != nil { pt.Errorf("download from source error: %s", err) diff --git a/client/daemon/peer/peertask_file_test.go b/client/daemon/peer/peertask_file_test.go index 83aed0bb1e6..58ae8041621 100644 --- a/client/daemon/peer/peertask_file_test.go +++ b/client/daemon/peer/peertask_file_test.go @@ -125,7 +125,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) { _, pt, _, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager, - &req.PeerTaskRequest, + req, ptm.schedulerClient, ptm.schedulerOption, 0) @@ -243,7 +243,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) { _, pt, _, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager, - &req.PeerTaskRequest, + req, ptm.schedulerClient, ptm.schedulerOption, 0) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 3fb75d5e513..452329fa73b 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -147,8 +147,12 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer } // TODO ensure scheduler is ok first start := time.Now() + limit := ptm.perPeerRateLimit + if req.Limit > 0 { + limit = rate.Limit(req.Limit) + } ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager, - &req.PeerTaskRequest, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit) + req, ptm.schedulerClient, ptm.schedulerOption, limit) if err != nil { return nil, nil, err } diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 81c8e792f13..ffbe2dd18bc 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -144,7 +144,11 @@ func (m *server) Download(ctx context.Context, PeerId: clientutil.GenPeerID(m.peerHost), PeerHost: m.peerHost, }, - Output: req.Output, + Output: req.Output, + Limit: req.Limit, + DisableBackSource: req.DisableBackSource, + Pattern: req.Pattern, + Callsystem: req.Callsystem, } log := logger.With("peer", peerTask.PeerId, "component", "downloadService")