Skip to content

Commit

Permalink
Refactor scheduler (#369)
Browse files Browse the repository at this point in the history
Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 authored Jul 22, 2021
1 parent 8efafd8 commit f535dfb
Show file tree
Hide file tree
Showing 82 changed files with 4,718 additions and 4,625 deletions.
6 changes: 3 additions & 3 deletions cdnsystem/daemon/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
return nil, errors.Wrapf(err, "write piece meta records failed")
}
}
// todo already download done, piece 信息已经写完但是meta信息还没有完成更新
// TODO already download done, piece 信息已经写完但是meta信息还没有完成更新
//if metaData.SourceFileLen >=0 && int64(breakPoint) == metaData.SourceFileLen {
// return &cacheResult{
// breakPoint: -1,
Expand All @@ -198,7 +198,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
// fileMd5: fileMd5,
// }, nil
//}
// todo 整理数据文件 truncate breakpoint之后的数据内容
// TODO 整理数据文件 truncate breakpoint之后的数据内容
return &cacheResult{
breakPoint: int64(breakPoint),
pieceMetaRecords: pieceMetaRecords,
Expand Down Expand Up @@ -247,7 +247,7 @@ func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error {

//checkPieceContent read piece content from reader and check data integrity by pieceMetaRecord
func checkPieceContent(reader io.Reader, pieceRecord *storage.PieceMetaRecord, fileMd5 hash.Hash) error {
// todo Analyze the original data for the slice format to calculate fileMd5
// TODO Analyze the original data for the slice format to calculate fileMd5
pieceMd5 := md5.New()
tee := io.TeeReader(io.TeeReader(io.LimitReader(reader, int64(pieceRecord.PieceLen)), pieceMd5), fileMd5)
if n, err := io.Copy(ioutil.Discard, tee); n != int64(pieceRecord.PieceLen) || err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdnsystem/daemon/cdn/cache_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cw *cacheWriter) startWriter(reader io.Reader, task *types.SeedTask, detec
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("stat cdn download file: %v", err)
}
// todo Try getting it from the ProgressManager first
// TODO Try getting it from the ProgressManager first
pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(task.TaskID)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("get piece md5 sign: %v", err)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh
go func() {
defer wg.Done()
for piece := range pieceCh {
// todo Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle
// TODO Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle
waitToWriteContent := piece.pieceContent
originPieceLen := waitToWriteContent.Len() // the length of the original data that has not been processed
pieceLen := originPieceLen // the real length written to the storage medium after processing
Expand Down
4 changes: 1 addition & 3 deletions cdnsystem/daemon/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "d7y.io/dragonfly/v2/cdnsystem/daemon/cdn/storage/disk" // To register diskStorage
_ "d7y.io/dragonfly/v2/cdnsystem/daemon/cdn/storage/hybrid" // To register hybridStorage
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server"

"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/util/timeutils"

Expand Down Expand Up @@ -166,9 +167,6 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downl
errorMsg = fmt.Sprintf("task total piece count not match expected: %d real: %d", task.PieceTotal, downloadMetadata.pieceTotalCount)
isSuccess = false
}
if !stringutils.IsBlank(errorMsg) {
logger.WithTaskID(task.TaskID).Error(errorMsg)
}
sourceFileLen := task.SourceFileLength
if isSuccess && task.SourceFileLength <= 0 {
sourceFileLen = downloadMetadata.realSourceFileLength
Expand Down
2 changes: 1 addition & 1 deletion cdnsystem/daemon/cdn/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,5 @@ func (suite *CDNManagerTestSuite) TestTriggerCDN() {
cacheSeedTask, err := suite.cm.TriggerCDN(context.Background(), gotSeedTask)
suite.Nil(err)
suite.Equal(targetTask, cacheSeedTask)
// todo test range download
// TODO test range download
}
14 changes: 8 additions & 6 deletions cdnsystem/server/service/cdn_seed_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
Expand Down Expand Up @@ -117,8 +118,8 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
peerID := cdnutil.GenCDNPeerID(req.TaskId)
for piece := range pieceChan {
psc <- &cdnsystem.PieceSeed{
PeerId: peerID,
SeederName: iputils.HostName,
PeerId: peerID,
HostUuid: idgen.CDNUUID(iputils.HostName, int32(css.cfg.ListenPort)),
PieceInfo: &base.PieceInfo{
PieceNum: piece.PieceNum,
RangeStart: piece.PieceRange.StartIndex,
Expand All @@ -136,10 +137,11 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
return dferrors.Newf(dfcodes.CdnTaskDownloadFail, "task(%s) status error , status: %s", req.TaskId, task.CdnStatus)
}
psc <- &cdnsystem.PieceSeed{
PeerId: peerID,
SeederName: iputils.HostName,
Done: true,
ContentLength: task.SourceFileLength,
PeerId: peerID,
HostUuid: idgen.CDNUUID(iputils.HostName, int32(css.cfg.ListenPort)),
Done: true,
ContentLength: task.SourceFileLength,
TotalPieceCount: task.PieceTotal,
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (*clientDaemon) prepareTCPListener(opt config.ListenOption, withTLS bool) (

func (cd *clientDaemon) Serve() error {
cd.GCManager.Start()
// todo remove this field, and use directly dfpath.DaemonSockPath
// TODO remove this field, and use directly dfpath.DaemonSockPath
cd.Option.Download.DownloadGRPC.UnixListen.Socket = dfpath.DaemonSockPath
// prepare download service listen
if cd.Option.Download.DownloadGRPC.UnixListen == nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package main

import "d7y.io/dragonfly/v2/cmd/scheduler/cmd"
import (
"d7y.io/dragonfly/v2/cmd/scheduler/cmd"
_ "d7y.io/dragonfly/v2/scheduler/core/scheduler/basic"
)

func main() {
cmd.Execute()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ require (
gorm.io/driver/mysql v1.0.5
gorm.io/gorm v1.21.6
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.20.6
k8s.io/apimachinery v0.20.6 // indirect
k8s.io/client-go v11.0.0+incompatible
)
Loading

0 comments on commit f535dfb

Please sign in to comment.