Skip to content

Commit

Permalink
feat: Add schedule log (#495)
Browse files Browse the repository at this point in the history
* feat: add bad node log

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 authored Jul 26, 2021
1 parent fea504f commit 2b18530
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
8 changes: 6 additions & 2 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func WithTaskID(taskID string) *SugaredLoggerOnWith {
}
}

func WithPeerID(peerID string) *SugaredLoggerOnWith {
func WithTaskAndPeerID(taskID string, peerID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []interface{}{"peerID", peerID},
withArgs: []interface{}{"taskId", taskID, "peerID", peerID},
}
}

Expand Down Expand Up @@ -128,6 +128,10 @@ func (log *SugaredLoggerOnWith) Debugf(template string, args ...interface{}) {
CoreLogger.Debugw(fmt.Sprintf(template, args...), log.withArgs...)
}

func (log *SugaredLoggerOnWith) Debug(args ...interface{}) {
CoreLogger.Debugw(fmt.Sprint(args...), log.withArgs...)
}

func Infof(template string, args ...interface{}) {
CoreLogger.Infof(template, args...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/dfdaemon/client/down_result_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) {
defer func() {
if dr != nil {
if dr.TaskId != drs.hashKey {
logger.WithPeerID(dr.PeerId).Warnf("down result stream correct taskId from %s to %s", drs.hashKey, dr.TaskId)
logger.WithTaskAndPeerID(dr.TaskId, dr.PeerId).Warnf("down result stream correct taskId from %s to %s", drs.hashKey, dr.TaskId)
drs.dc.Connection.CorrectKey2NodeRelation(drs.hashKey, dr.TaskId)
drs.hashKey = dr.TaskId
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/rpc/scheduler/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule
res interface{}
)
key := idgen.TaskID(ptr.Url, ptr.Filter, ptr.UrlMeta, ptr.BizId)
logger.WithPeerID(ptr.PeerId).Infof("generate hash key taskId: %s and start to register peer task for peer_id(%s) url(%s)", key, ptr.PeerId, ptr.Url)
logger.WithTaskAndPeerID(key, ptr.PeerId).Infof("generate hash key taskId: %s and start to register peer task for peer_id(%s) url(%s)", key, ptr.PeerId,
ptr.Url)
reg := func() (interface{}, error) {
var client scheduler.SchedulerClient
client, schedulerNode, err = sc.getSchedulerClient(key, false)
Expand All @@ -102,7 +103,7 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule
taskID = rr.TaskId
code = dfcodes.Success
if taskID != key {
logger.WithPeerID(ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID)
logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID)
sc.Connection.CorrectKey2NodeRelation(key, taskID)
}
logger.With("peerId", ptr.PeerId).
Expand Down
16 changes: 11 additions & 5 deletions scheduler/core/evaluator/basic/basic_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool {
}

if peer.GetParent() == nil && !peer.IsDone() {
logger.Debugf("peer %s need adjust parent because it has not parent and status is %s", peer.PeerID, peer.GetStatus())
return true
}

if peer.GetParent() != nil && eval.IsBadNode(peer.GetParent()) {
logger.Debugf("peer %s need adjust parent because it current parent is bad", peer.PeerID)
return true
}
costHistory := peer.GetCostHistory()
Expand All @@ -54,10 +56,14 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool {

avgCost, lastCost := getAvgAndLastCost(costHistory, 4)
if avgCost*40 < lastCost {
logger.Debugf("IsBadNode [%s]: recent pieces have taken too long to download", peer.PeerID)
logger.Debugf("peer %s is bad because recent pieces have taken too long to download", peer.PeerID)
}
// TODO adjust policy
return (avgCost * 20) < lastCost
result := (avgCost * 20) < lastCost
if result == true {
logger.Debugf("peer %s need adjust parent because it latest download cost is too time consuming", peer.PeerID)
}
return result
}

func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool {
Expand All @@ -66,6 +72,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool {
}

if peer.IsBad() {
logger.Debugf("peer %s is bad because status is %s", peer.PeerID, peer.GetStatus())
return true
}

Expand All @@ -78,9 +85,8 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool {
if parent == nil {
return false
}
logger.Debugf("IsBadNode [%s]: %s have elapsed since the last access %s, now %s", time.Now().Sub(peer.GetLastAccessTime()), peer.PeerID,
peer.GetLastAccessTime(), time.Now())
if time.Now().After(peer.GetLastAccessTime().Add(5 * time.Second)) {
logger.Debugf("peer %s is bad because have elapsed %s > 5s since the last access", peer.PeerID, time.Now().Sub(peer.GetLastAccessTime()))
return true
}

Expand All @@ -92,7 +98,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool {
avgCost, lastCost := getAvgAndLastCost(costHistory, 4)

if avgCost*40 < lastCost {
logger.Debugf("IsNodeBad [%s]: recent pieces have taken too long to download avg[%d] last[%d]", peer.PeerID, avgCost, lastCost)
logger.Debugf("peer %s is bad because recent pieces have taken too long to download avg[%d] last[%d]", peer.PeerID, avgCost, lastCost)
return true
}

Expand Down
4 changes: 2 additions & 2 deletions scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (m *monitor) printDebugInfo() string {
}

func (m *monitor) RefreshDownloadMonitor(peer *types.Peer) {
logger.Debugf("[%s][%s] downloadMonitorWorkingLoop refresh ", peer.Task.TaskID, peer.PeerID)
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ")
if !peer.IsRunning() {
m.downloadMonitorQueue.AddAfter(peer, time.Second*2)
} else if peer.IsWaiting() {
Expand All @@ -141,7 +141,7 @@ func (m *monitor) downloadMonitorWorkingLoop() {
//if m.downloadMonitorCallBack != nil {
peer := v.(*types.Peer)
if peer != nil {
logger.Debugf("[%s][%s] downloadMonitorWorkingLoop status[%d]", peer.Task.TaskID, peer.PeerID, peer.GetStatus())
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus())
if peer.IsSuccess() || peer.Host.CDN {
// clear from monitor
} else {
Expand Down
35 changes: 30 additions & 5 deletions scheduler/core/scheduler/basic/basic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ type Scheduler struct {
}

func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) {
logger.Debugf("[%s][%s]scheduler children", peer.Task.TaskID, peer.PeerID)
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler children flow")
if s.evaluator.IsBadNode(peer) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("peer is badNode")
return
}
freeUpload := peer.Host.GetFreeUploadLoad()
candidateChildren := s.selectCandidateChildren(peer, freeUpload*2)
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d candidate children %v", len(candidateChildren), candidateChildren)
evalResult := make(map[float64]*types.Peer)
var evalScore []float64
for _, child := range candidateChildren {
Expand All @@ -99,19 +101,21 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer)
for _, child := range children {
child.ReplaceParent(peer)
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("final schedule children list %v", children)
return
}

func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) {
logger.Debugf("[%s][%s]scheduler parent", peer.Task.TaskID, peer.PeerID)
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow")
if !s.evaluator.NeedAdjustParent(peer) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, current parent is %v", peer.GetParent())
if peer.GetParent() == nil {
return nil, nil, false
}
return peer.GetParent(), []*types.Peer{peer.GetParent()}, true
}
candidateParents := s.selectCandidateParents(peer, s.cfg.CandidateParentCount)
logger.Debugf("[%s][%s]select num %d candidates", peer.Task.TaskID, peer.PeerID, len(candidateParents))
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d parent candidates %v", len(candidateParents), candidateParents)
var value float64
var primary = peer.GetParent()
for _, candidate := range candidateParents {
Expand All @@ -138,29 +142,50 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer

func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list []*types.Peer) {
return s.peerManager.Pick(peer.Task, limit, func(candidateNode *types.Peer) bool {
if candidateNode == nil || candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer {
if candidateNode == nil {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer is not selected because it is nil")
return false
}
if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %v",
candidateNode.PeerID, candidateNode)
return false
}
if candidateNode.Host != nil && candidateNode.Host.CDN {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is not selected because it is a cdn host", candidateNode.PeerID)
return false
}
if candidateNode.GetParent() == nil {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is selected because it has not parent",
candidateNode.PeerID)
return true
}

if candidateNode.GetParent() != nil && s.evaluator.IsBadNode(candidateNode.GetParent()) {
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID)
return true
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %v",
candidateNode.PeerID, candidateNode)
return false
})
}

func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) {
return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool {
if candidateNode == nil || s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host.
if candidateNode == nil {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer is not selected because it is nil")
return false
}
if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host.
GetFreeUploadLoad() <= 0 {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %v",
candidateNode.PeerID, candidateNode)
return false
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %v",
candidateNode.PeerID, candidateNode)
return true
})
}

0 comments on commit 2b18530

Please sign in to comment.