Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dragonflyoss/Dragonfly2 into featur…
Browse files Browse the repository at this point in the history
…e/charts
  • Loading branch information
gaius-qi committed Aug 11, 2021
2 parents 91b91e6 + c1c3d65 commit 0465aba
Show file tree
Hide file tree
Showing 21 changed files with 130 additions and 104 deletions.
18 changes: 18 additions & 0 deletions cdnsystem/cdnutil/cdn_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,27 @@ package cdnutil
import (
"fmt"

"d7y.io/dragonfly/v2/cdnsystem/config"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
)

func GenCDNPeerID(taskID string) string {
return fmt.Sprintf("%s-%s_%s", iputils.HostName, taskID, "CDN")
}

// ComputePieceSize computes the piece size with specified fileLength.
//
// If the fileLength<=0, which means failed to get fileLength
// and then use the DefaultPieceSize.
func ComputePieceSize(length int64) int32 {
if length <= 0 || length <= 200*1024*1024 {
return config.DefaultPieceSize
}

gapCount := length / int64(100*1024*1024)
mpSize := (gapCount-2)*1024*1024 + config.DefaultPieceSize
if mpSize > config.DefaultPieceSizeLimit {
return config.DefaultPieceSizeLimit
}
return int32(mpSize)
}
24 changes: 4 additions & 20 deletions cdnsystem/daemon/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"reflect"
"time"

"d7y.io/dragonfly/v2/cdnsystem/config"
"github.com/pkg/errors"

"d7y.io/dragonfly/v2/cdnsystem/cdnutil"
cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors"
"d7y.io/dragonfly/v2/cdnsystem/types"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/util/net/urlutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -108,7 +109,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis

// calculate piece size and update the PieceSize and PieceTotal
if task.PieceSize <= 0 {
pieceSize := computePieceSize(task.SourceFileLength)
pieceSize := cdnutil.ComputePieceSize(task.SourceFileLength)
task.PieceSize = pieceSize
}
tm.taskStore.Add(task.TaskID, task)
Expand Down Expand Up @@ -193,20 +194,3 @@ func isSameTask(task1, task2 *types.SeedTask) bool {

return true
}

// computePieceSize computes the piece size with specified fileLength.
//
// If the fileLength<=0, which means failed to get fileLength
// and then use the DefaultPieceSize.
func computePieceSize(length int64) int32 {
if length <= 0 || length <= 200*1024*1024 {
return config.DefaultPieceSize
}

gapCount := length / int64(100*1024*1024)
mpSize := (gapCount-2)*1024*1024 + config.DefaultPieceSize
if mpSize > config.DefaultPieceSizeLimit {
return config.DefaultPieceSizeLimit
}
return int32(mpSize)
}
7 changes: 5 additions & 2 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
Expand Down Expand Up @@ -86,8 +87,8 @@ func newFilePeerTask(ctx context.Context,

logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag, request.PeerId)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()
Expand All @@ -104,8 +105,10 @@ func newFilePeerTask(ctx context.Context,
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}

if result == nil {
defer span.End()
span.RecordError(err)
Expand Down
12 changes: 5 additions & 7 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand All @@ -27,7 +26,6 @@ import (
)

type filePeerTaskCallback struct {
ctx context.Context
ptm *peerTaskManager
pt *filePeerTask
req *FilePeerTaskRequest
Expand All @@ -42,7 +40,7 @@ func (p *filePeerTaskCallback) GetStartTime() time.Time {

func (p *filePeerTaskCallback) Init(pt Task) error {
// prepare storage
err := p.ptm.storageManager.RegisterTask(p.ctx,
err := p.ptm.storageManager.RegisterTask(p.pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -60,7 +58,7 @@ func (p *filePeerTaskCallback) Init(pt Task) error {

func (p *filePeerTaskCallback) Update(pt Task) error {
// update storage
err := p.ptm.storageManager.UpdateTask(p.ctx,
err := p.ptm.storageManager.UpdateTask(p.pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: pt.GetPeerID(),
Expand All @@ -79,7 +77,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
var cost = time.Now().Sub(p.start).Milliseconds()
pt.Log().Infof("file peer task done, cost: %dms", cost)
e := p.ptm.storageManager.Store(
context.Background(),
p.pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -93,7 +91,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -118,7 +116,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
12 changes: 6 additions & 6 deletions client/daemon/peer/peertask_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ import (
"testing"
"time"

"d7y.io/dragonfly/v2/cdnsystem/cdnutil"
"d7y.io/dragonfly/v2/pkg/rpc/base"
rangers "d7y.io/dragonfly/v2/pkg/util/rangeutils"

"github.com/golang/mock/gomock"
testifyassert "github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/test"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/source"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
"github.com/golang/mock/gomock"
testifyassert "github.com/stretchr/testify/assert"
)

func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
Expand Down Expand Up @@ -100,7 +102,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
pieceManager: &pieceManager{
storageManager: storageManager,
pieceDownloader: downloader,
computePieceSize: computePieceSize,
computePieceSize: cdnutil.ComputePieceSize,
},
storageManager: storageManager,
schedulerClient: schedulerClient,
Expand Down Expand Up @@ -131,7 +133,6 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
pt.needBackSource = true

pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down Expand Up @@ -219,7 +220,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
pieceManager: &pieceManager{
storageManager: storageManager,
pieceDownloader: downloader,
computePieceSize: computePieceSize,
computePieceSize: cdnutil.ComputePieceSize,
},
storageManager: storageManager,
schedulerClient: schedulerClient,
Expand Down Expand Up @@ -250,7 +251,6 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
pt.needBackSource = true

pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down
2 changes: 0 additions & 2 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
return nil, tiny, nil
}
pt.SetCallback(&filePeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down Expand Up @@ -224,7 +223,6 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu

pt.SetCallback(
&streamPeerTaskCallback{
ctx: ctx,
ptm: ptm,
pt: pt,
req: req,
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
testifyassert "github.com/stretchr/testify/assert"
"google.golang.org/grpc"

"d7y.io/dragonfly/v2/cdnsystem/cdnutil"
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestPeerTaskManager_StartStreamPeerTask_BackSource(t *testing.T) {
pieceManager: &pieceManager{
storageManager: storageManager,
pieceDownloader: NewMockPieceDownloader(ctrl),
computePieceSize: computePieceSize,
computePieceSize: cdnutil.ComputePieceSize,
},
storageManager: storageManager,
schedulerClient: sched,
Expand Down
7 changes: 4 additions & 3 deletions client/daemon/peer/peertask_reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"io"
"time"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
Expand Down
7 changes: 5 additions & 2 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
Expand Down Expand Up @@ -69,8 +70,8 @@ func newStreamPeerTask(ctx context.Context,
logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, tag: %s",
request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag)
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()
Expand All @@ -87,8 +88,10 @@ func newStreamPeerTask(ctx context.Context,
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}

if result == nil {
defer span.End()
span.RecordError(err)
Expand Down
12 changes: 5 additions & 7 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand All @@ -27,7 +26,6 @@ import (
)

type streamPeerTaskCallback struct {
ctx context.Context
ptm *peerTaskManager
pt *streamPeerTask
req *scheduler.PeerTaskRequest
Expand All @@ -42,7 +40,7 @@ func (p *streamPeerTaskCallback) GetStartTime() time.Time {

func (p *streamPeerTaskCallback) Init(pt Task) error {
// prepare storage
err := p.ptm.storageManager.RegisterTask(p.ctx,
err := p.ptm.storageManager.RegisterTask(p.pt.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -59,7 +57,7 @@ func (p *streamPeerTaskCallback) Init(pt Task) error {

func (p *streamPeerTaskCallback) Update(pt Task) error {
// update storage
err := p.ptm.storageManager.UpdateTask(p.ctx,
err := p.ptm.storageManager.UpdateTask(p.pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: pt.GetPeerID(),
Expand All @@ -78,7 +76,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
var cost = time.Now().Sub(p.start).Milliseconds()
pt.Log().Infof("stream peer task done, cost: %dms", cost)
e := p.ptm.storageManager.Store(
context.Background(),
p.pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
Expand All @@ -91,7 +89,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -116,7 +114,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
Loading

0 comments on commit 0465aba

Please sign in to comment.