Skip to content

Commit

Permalink
let cdn support sha256 (#517)
Browse files Browse the repository at this point in the history
cdn support sha256 digest
  • Loading branch information
zzy987 authored Aug 6, 2021
1 parent 85bfa36 commit 9a0d057
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 182 deletions.
4 changes: 2 additions & 2 deletions cdnsystem/daemon/cdn/cache_data_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (mm *cacheDataManager) updateStatusAndResult(taskID string, metaData *stora
if metaData.TotalPieceCount > 0 {
originMetaData.TotalPieceCount = metaData.TotalPieceCount
}
if !stringutils.IsBlank(metaData.SourceRealMd5) {
originMetaData.SourceRealMd5 = metaData.SourceRealMd5
if !stringutils.IsBlank(metaData.SourceRealDigest) {
originMetaData.SourceRealDigest = metaData.SourceRealDigest
}
if !stringutils.IsBlank(metaData.PieceMd5Sign) {
originMetaData.PieceMd5Sign = metaData.PieceMd5Sign
Expand Down
24 changes: 12 additions & 12 deletions cdnsystem/daemon/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func newCacheDetector(cacheDataManager *cacheDataManager) *cacheDetector {
}
}

func (cd *cacheDetector) detectCache(task *types.SeedTask, fileMd5 hash.Hash) (*cacheResult, error) {
func (cd *cacheDetector) detectCache(task *types.SeedTask, fileDigest hash.Hash) (*cacheResult, error) {
//err := cd.cacheStore.CreateUploadLink(ctx, task.TaskId)
//if err != nil {
// return nil, errors.Wrapf(err, "failed to create upload symbolic link")
//}
result, err := cd.doDetect(task, fileMd5)
result, err := cd.doDetect(task, fileDigest)
if err != nil {
logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err)
metaData, err := cd.resetCache(task)
Expand All @@ -83,7 +83,7 @@ func (cd *cacheDetector) detectCache(task *types.SeedTask, fileMd5 hash.Hash) (*
}

// doDetect the actual detect action which detects file metaData and pieces metaData of specific task
func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (result *cacheResult, err error) {
func (cd *cacheDetector) doDetect(task *types.SeedTask, fileDigest hash.Hash) (result *cacheResult, err error) {
fileMetaData, err := cd.cacheDataManager.readFileMetaData(task.TaskID)
if err != nil {
return nil, errors.Wrapf(err, "read file meta data of task %s", task.TaskID)
Expand Down Expand Up @@ -118,7 +118,7 @@ func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (resu
if !supportRange {
return nil, cdnerrors.ErrResourceNotSupportRangeRequest{URL: task.URL}
}
return cd.parseByReadFile(task.TaskID, fileMetaData, fileMd5)
return cd.parseByReadFile(task.TaskID, fileMetaData, fileDigest)
}

// parseByReadMetaFile detect cache by read meta and pieceMeta files of task
Expand Down Expand Up @@ -154,7 +154,7 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag
}

// parseByReadFile detect cache by read pieceMeta and data files of task
func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileMd5 hash.Hash) (*cacheResult, error) {
func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileDigest hash.Hash) (*cacheResult, error) {
reader, err := cd.cacheDataManager.readDownloadFile(taskID)
if err != nil {
return nil, errors.Wrapf(err, "read data file")
Expand All @@ -177,7 +177,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
break
}
// read content
if err := checkPieceContent(reader, tempRecords[index], fileMd5); err != nil {
if err := checkPieceContent(reader, tempRecords[index], fileDigest); err != nil {
logger.WithTaskID(taskID).Errorf("read content of pieceNum %d failed: %v", tempRecords[index].PieceNum, err)
break
}
Expand Down Expand Up @@ -237,19 +237,19 @@ func checkSameFile(task *types.SeedTask, metaData *storage.FileMetaData) error {
if metaData.TaskURL != task.TaskURL {
return errors.Errorf("meta task taskUrl(%s) is not equals with task taskUrl(%s)", metaData.TaskURL, task.URL)
}
if !stringutils.IsBlank(metaData.SourceRealMd5) && !stringutils.IsBlank(task.RequestMd5) &&
metaData.SourceRealMd5 != task.RequestMd5 {
return errors.Errorf("meta task source md5(%s) is not equals with task request md5(%s)",
metaData.SourceRealMd5, task.RequestMd5)
if !stringutils.IsBlank(metaData.SourceRealDigest) && !stringutils.IsBlank(task.RequestDigest) &&
metaData.SourceRealDigest != task.RequestDigest {
return errors.Errorf("meta task source digest(%s) is not equals with task request digest(%s)",
metaData.SourceRealDigest, task.RequestDigest)
}
return nil
}

//checkPieceContent read piece content from reader and check data integrity by pieceMetaRecord
func checkPieceContent(reader io.Reader, pieceRecord *storage.PieceMetaRecord, fileMd5 hash.Hash) error {
func checkPieceContent(reader io.Reader, pieceRecord *storage.PieceMetaRecord, fileDigest hash.Hash) error {
// TODO Analyze the original data for the slice format to calculate fileMd5
pieceMd5 := md5.New()
tee := io.TeeReader(io.TeeReader(io.LimitReader(reader, int64(pieceRecord.PieceLen)), pieceMd5), fileMd5)
tee := io.TeeReader(io.TeeReader(io.LimitReader(reader, int64(pieceRecord.PieceLen)), pieceMd5), fileDigest)
if n, err := io.Copy(ioutil.Discard, tee); n != int64(pieceRecord.PieceLen) || err != nil {
return errors.Wrap(err, "read piece content")
}
Expand Down
52 changes: 26 additions & 26 deletions cdnsystem/daemon/cdn/cache_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,37 +218,37 @@ var fullPieceMetaRecords = append(partialPieceMetaRecords, &storage.PieceMetaRec

func newCompletedFileMeta(taskID string, URL string, success bool) *storage.FileMetaData {
return &storage.FileMetaData{
TaskID: taskID,
TaskURL: URL,
PieceSize: 2000,
SourceFileLen: 9789,
AccessTime: 1624126443284,
Interval: 0,
CdnFileLength: 9789,
SourceRealMd5: "",
PieceMd5Sign: "98166bdfebb7b71dd5c6d47492d844f4421d90199641ca11fd8ce3111894115a",
ExpireInfo: nil,
Finish: true,
Success: success,
TotalPieceCount: 5,
TaskID: taskID,
TaskURL: URL,
PieceSize: 2000,
SourceFileLen: 9789,
AccessTime: 1624126443284,
Interval: 0,
CdnFileLength: 9789,
SourceRealDigest: "",
PieceMd5Sign: "98166bdfebb7b71dd5c6d47492d844f4421d90199641ca11fd8ce3111894115a",
ExpireInfo: nil,
Finish: true,
Success: success,
TotalPieceCount: 5,
}
}

func newPartialFileMeta(taskID string, URL string) *storage.FileMetaData {
return &storage.FileMetaData{
TaskID: taskID,
TaskURL: URL,
PieceSize: 2000,
SourceFileLen: 9789,
AccessTime: 1624126443284,
Interval: 0,
CdnFileLength: 0,
SourceRealMd5: "",
PieceMd5Sign: "",
ExpireInfo: nil,
Finish: false,
Success: false,
TotalPieceCount: 0,
TaskID: taskID,
TaskURL: URL,
PieceSize: 2000,
SourceFileLen: 9789,
AccessTime: 1624126443284,
Interval: 0,
CdnFileLength: 0,
SourceRealDigest: "",
PieceMd5Sign: "",
ExpireInfo: nil,
Finish: false,
Success: false,
TotalPieceCount: 0,
}
}

Expand Down
47 changes: 28 additions & 19 deletions cdnsystem/daemon/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package cdn

import (
"crypto/md5"
"time"

"d7y.io/dragonfly/v2/pkg/util/digestutils"

"context"
"crypto/md5"
"fmt"

"d7y.io/dragonfly/v2/cdnsystem/daemon"
Expand Down Expand Up @@ -85,9 +87,16 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
// obtain taskId write lock
cm.cdnLocker.Lock(task.TaskID, false)
defer cm.cdnLocker.UnLock(task.TaskID, false)

var fileDigest = md5.New()
var digestType = digestutils.Md5Hash.String()
if !stringutils.IsBlank(task.RequestDigest) {
requestDigest := digestutils.Parse(task.RequestDigest)
digestType = requestDigest[0]
fileDigest = digestutils.CreateHash(digestType)
}
// first: detect Cache
fileMd5 := md5.New()
detectResult, err := cm.detector.detectCache(task, fileMd5)
detectResult, err := cm.detector.detectCache(task, fileDigest)
if err != nil {
seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed)
return seedTask, errors.Wrapf(err, "failed to detect cache")
Expand All @@ -101,7 +110,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
// full cache
if detectResult.breakPoint == -1 {
logger.WithTaskID(task.TaskID).Infof("cache full hit on local")
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealMd5, detectResult.fileMetaData.PieceMd5Sign,
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealDigest, detectResult.fileMetaData.PieceMd5Sign,
detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength)
return seedTask, nil
}
Expand All @@ -116,8 +125,8 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
return seedTask, err
}
defer body.Close()
reader := limitreader.NewLimitReaderWithLimiterAndDigest(body, cm.limiter, fileDigest, digestutils.Algorithms[digestType])

reader := limitreader.NewLimitReaderWithLimiterAndMD5Sum(body, cm.limiter, fileMd5)
// forth: write to storage
downloadMetadata, err := cm.writer.startWriter(reader, task, detectResult)
if err != nil {
Expand All @@ -129,14 +138,14 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
}
server.StatSeedFinish(task.TaskID, task.URL, true, nil, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
downloadMetadata.realSourceFileLength)
sourceMD5 := reader.Md5()
sourceDigest := reader.Digest()
// fifth: handle CDN result
success, err := cm.handleCDNResult(task, sourceMD5, downloadMetadata)
success, err := cm.handleCDNResult(task, sourceDigest, downloadMetadata)
if err != nil || !success {
seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed)
return seedTask, err
}
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, sourceMD5, downloadMetadata.pieceMd5Sign,
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, sourceDigest, downloadMetadata.pieceMd5Sign,
downloadMetadata.realSourceFileLength, downloadMetadata.realCdnFileLength)
return seedTask, nil
}
Expand All @@ -149,13 +158,13 @@ func (cm *Manager) Delete(taskID string) error {
return nil
}

func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downloadMetadata *downloadMetadata) (bool, error) {
func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) {
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
var isSuccess = true
var errorMsg string
// check md5
if !stringutils.IsBlank(task.RequestMd5) && task.RequestMd5 != sourceMd5 {
errorMsg = fmt.Sprintf("file md5 not match expected: %s real: %s", task.RequestMd5, sourceMd5)
if !stringutils.IsBlank(task.RequestDigest) && task.RequestDigest != sourceDigest {
errorMsg = fmt.Sprintf("file digest not match expected: %s real: %s", task.RequestDigest, sourceDigest)
isSuccess = false
}
// check source length
Expand All @@ -178,13 +187,13 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downl
cdnFileLength = 0
}
if err := cm.cacheDataManager.updateStatusAndResult(task.TaskID, &storage.FileMetaData{
Finish: true,
Success: isSuccess,
SourceRealMd5: sourceMd5,
PieceMd5Sign: pieceMd5Sign,
CdnFileLength: cdnFileLength,
SourceFileLen: sourceFileLen,
TotalPieceCount: downloadMetadata.pieceTotalCount,
Finish: true,
Success: isSuccess,
SourceRealDigest: sourceDigest,
PieceMd5Sign: pieceMd5Sign,
CdnFileLength: cdnFileLength,
SourceFileLen: sourceFileLen,
TotalPieceCount: downloadMetadata.pieceTotalCount,
}); err != nil {
return false, errors.Wrap(err, "failed to update task status and result")
}
Expand All @@ -193,7 +202,7 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceMd5 string, downl
return false, errors.New(errorMsg)
}

logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realMd5: %s", downloadMetadata, sourceMd5)
logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest)

return true, nil
}
Expand Down
Loading

0 comments on commit 9a0d057

Please sign in to comment.