Skip to content

Commit

Permalink
feat: support limit from dfget client (#578)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Aug 26, 2021
1 parent 581f178 commit 4a889fb
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 11 deletions.
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
reasonReScheduleTimeout = "wait more available peers from scheduler timeout"
reasonContextCanceled = "context canceled"
reasonPeerGoneFromScheduler = "scheduler says client should disconnect"
reasonBackSourceDisabled = "download from source disabled"

failedReasonNotSet = "unknown"
failedCodeNotSet = 0
Expand Down
31 changes: 24 additions & 7 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ import (

type FilePeerTaskRequest struct {
scheduler.PeerTaskRequest
Output string
Output string
Limit float64
DisableBackSource bool
Pattern string
Callsystem string
}

// FilePeerTask represents a peer task to download a file
Expand All @@ -52,6 +56,11 @@ type filePeerTask struct {
// progressCh holds progress status
progressCh chan *FilePeerTaskProgress
progressStopCh chan bool

// disableBackSource indicates not back source when failed
disableBackSource bool
pattern string
callsystem string
}

var _ FilePeerTask = (*filePeerTask)(nil)
Expand All @@ -75,7 +84,7 @@ type FilePeerTaskProgress struct {
func newFilePeerTask(ctx context.Context,
host *scheduler.PeerHost,
pieceManager PieceManager,
request *scheduler.PeerTaskRequest,
request *FilePeerTaskRequest,
schedulerClient schedulerclient.SchedulerClient,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit) (context.Context, *filePeerTask, *TinyData, error) {
Expand All @@ -89,7 +98,7 @@ func newFilePeerTask(ctx context.Context,
// trace register
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
logger.Infof("step 1: peer %s start to register", request.PeerId)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
result, err := schedulerClient.RegisterPeerTask(regCtx, &request.PeerTaskRequest)
regSpan.RecordError(err)
regSpan.End()

Expand Down Expand Up @@ -149,7 +158,7 @@ func newFilePeerTask(ctx context.Context,
}
}
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, &request.PeerTaskRequest)
if err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err)
defer span.End()
Expand All @@ -162,12 +171,15 @@ func newFilePeerTask(ctx context.Context,
limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit))
}
pt := &filePeerTask{
progressCh: make(chan *FilePeerTaskProgress),
progressStopCh: make(chan bool),
progressCh: make(chan *FilePeerTaskProgress),
progressStopCh: make(chan bool),
disableBackSource: request.DisableBackSource,
pattern: request.Pattern,
callsystem: request.Callsystem,
peerTask: peerTask{
host: host,
needBackSource: needBackSource,
request: request,
request: &request.PeerTaskRequest,
peerPacketStream: peerPacketStream,
pieceManager: pieceManager,
peerPacketReady: make(chan bool, 1),
Expand Down Expand Up @@ -416,6 +428,11 @@ func (pt *filePeerTask) SetContentLength(i int64) error {

func (pt *filePeerTask) backSource() {
defer pt.cleanUnfinished()
if pt.disableBackSource {
pt.Errorf(reasonBackSourceDisabled)
pt.failedReason = reasonBackSourceDisabled
return
}
err := pt.pieceManager.DownloadSource(pt.ctx, pt, pt.request)
if err != nil {
pt.Errorf("download from source error: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
_, pt, _, err := newFilePeerTask(ctx,
ptm.host,
ptm.pieceManager,
&req.PeerTaskRequest,
req,
ptm.schedulerClient,
ptm.schedulerOption,
0)
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
_, pt, _, err := newFilePeerTask(ctx,
ptm.host,
ptm.pieceManager,
&req.PeerTaskRequest,
req,
ptm.schedulerClient,
ptm.schedulerOption,
0)
Expand Down
6 changes: 5 additions & 1 deletion client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,12 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
}
// TODO ensure scheduler is ok first
start := time.Now()
limit := ptm.perPeerRateLimit
if req.Limit > 0 {
limit = rate.Limit(req.Limit)
}
ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager,
&req.PeerTaskRequest, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit)
req, ptm.schedulerClient, ptm.schedulerOption, limit)
if err != nil {
return nil, nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func (m *server) Download(ctx context.Context,
PeerId: clientutil.GenPeerID(m.peerHost),
PeerHost: m.peerHost,
},
Output: req.Output,
Output: req.Output,
Limit: req.Limit,
DisableBackSource: req.DisableBackSource,
Pattern: req.Pattern,
Callsystem: req.Callsystem,
}
log := logger.With("peer", peerTask.PeerId, "component", "downloadService")

Expand Down

0 comments on commit 4a889fb

Please sign in to comment.