From 2b1853034b24e66377305dfb6f9aa77dbcedacb1 Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Mon, 26 Jul 2021 15:35:33 +0800 Subject: [PATCH] feat: Add schedule log (#495) * feat: add bad node log Signed-off-by: santong --- internal/dflog/logger.go | 8 +++-- pkg/rpc/dfdaemon/client/down_result_stream.go | 2 +- pkg/rpc/scheduler/client/client.go | 5 +-- .../core/evaluator/basic/basic_evaluator.go | 16 ++++++--- scheduler/core/monitor.go | 4 +-- .../core/scheduler/basic/basic_scheduler.go | 35 ++++++++++++++++--- 6 files changed, 53 insertions(+), 17 deletions(-) diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index 074e6e70b2f..3689ca5d9a2 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -94,9 +94,9 @@ func WithTaskID(taskID string) *SugaredLoggerOnWith { } } -func WithPeerID(peerID string) *SugaredLoggerOnWith { +func WithTaskAndPeerID(taskID string, peerID string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ - withArgs: []interface{}{"peerID", peerID}, + withArgs: []interface{}{"taskId", taskID, "peerID", peerID}, } } @@ -128,6 +128,10 @@ func (log *SugaredLoggerOnWith) Debugf(template string, args ...interface{}) { CoreLogger.Debugw(fmt.Sprintf(template, args...), log.withArgs...) } +func (log *SugaredLoggerOnWith) Debug(args ...interface{}) { + CoreLogger.Debugw(fmt.Sprint(args...), log.withArgs...) +} + func Infof(template string, args ...interface{}) { CoreLogger.Infof(template, args...) } diff --git a/pkg/rpc/dfdaemon/client/down_result_stream.go b/pkg/rpc/dfdaemon/client/down_result_stream.go index d62d8f3910e..66d9eafc912 100644 --- a/pkg/rpc/dfdaemon/client/down_result_stream.go +++ b/pkg/rpc/dfdaemon/client/down_result_stream.go @@ -85,7 +85,7 @@ func (drs *DownResultStream) Recv() (dr *dfdaemon.DownResult, err error) { defer func() { if dr != nil { if dr.TaskId != drs.hashKey { - logger.WithPeerID(dr.PeerId).Warnf("down result stream correct taskId from %s to %s", drs.hashKey, dr.TaskId) + logger.WithTaskAndPeerID(dr.TaskId, dr.PeerId).Warnf("down result stream correct taskId from %s to %s", drs.hashKey, dr.TaskId) drs.dc.Connection.CorrectKey2NodeRelation(drs.hashKey, dr.TaskId) drs.hashKey = dr.TaskId } diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index f973f6a0cd1..a715426c8aa 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -86,7 +86,8 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule res interface{} ) key := idgen.TaskID(ptr.Url, ptr.Filter, ptr.UrlMeta, ptr.BizId) - logger.WithPeerID(ptr.PeerId).Infof("generate hash key taskId: %s and start to register peer task for peer_id(%s) url(%s)", key, ptr.PeerId, ptr.Url) + logger.WithTaskAndPeerID(key, ptr.PeerId).Infof("generate hash key taskId: %s and start to register peer task for peer_id(%s) url(%s)", key, ptr.PeerId, + ptr.Url) reg := func() (interface{}, error) { var client scheduler.SchedulerClient client, schedulerNode, err = sc.getSchedulerClient(key, false) @@ -102,7 +103,7 @@ func (sc *schedulerClient) doRegisterPeerTask(ctx context.Context, ptr *schedule taskID = rr.TaskId code = dfcodes.Success if taskID != key { - logger.WithPeerID(ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) + logger.WithTaskAndPeerID(taskID, ptr.PeerId).Warnf("register peer task correct taskId from %s to %s", key, taskID) sc.Connection.CorrectKey2NodeRelation(key, taskID) } logger.With("peerId", ptr.PeerId). diff --git a/scheduler/core/evaluator/basic/basic_evaluator.go b/scheduler/core/evaluator/basic/basic_evaluator.go index e2ffcf5f0aa..1808bdcb69d 100644 --- a/scheduler/core/evaluator/basic/basic_evaluator.go +++ b/scheduler/core/evaluator/basic/basic_evaluator.go @@ -41,10 +41,12 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool { } if peer.GetParent() == nil && !peer.IsDone() { + logger.Debugf("peer %s need adjust parent because it has not parent and status is %s", peer.PeerID, peer.GetStatus()) return true } if peer.GetParent() != nil && eval.IsBadNode(peer.GetParent()) { + logger.Debugf("peer %s need adjust parent because it current parent is bad", peer.PeerID) return true } costHistory := peer.GetCostHistory() @@ -54,10 +56,14 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool { avgCost, lastCost := getAvgAndLastCost(costHistory, 4) if avgCost*40 < lastCost { - logger.Debugf("IsBadNode [%s]: recent pieces have taken too long to download", peer.PeerID) + logger.Debugf("peer %s is bad because recent pieces have taken too long to download", peer.PeerID) } // TODO adjust policy - return (avgCost * 20) < lastCost + result := (avgCost * 20) < lastCost + if result == true { + logger.Debugf("peer %s need adjust parent because it latest download cost is too time consuming", peer.PeerID) + } + return result } func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { @@ -66,6 +72,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { } if peer.IsBad() { + logger.Debugf("peer %s is bad because status is %s", peer.PeerID, peer.GetStatus()) return true } @@ -78,9 +85,8 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { if parent == nil { return false } - logger.Debugf("IsBadNode [%s]: %s have elapsed since the last access %s, now %s", time.Now().Sub(peer.GetLastAccessTime()), peer.PeerID, - peer.GetLastAccessTime(), time.Now()) if time.Now().After(peer.GetLastAccessTime().Add(5 * time.Second)) { + logger.Debugf("peer %s is bad because have elapsed %s > 5s since the last access", peer.PeerID, time.Now().Sub(peer.GetLastAccessTime())) return true } @@ -92,7 +98,7 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { avgCost, lastCost := getAvgAndLastCost(costHistory, 4) if avgCost*40 < lastCost { - logger.Debugf("IsNodeBad [%s]: recent pieces have taken too long to download avg[%d] last[%d]", peer.PeerID, avgCost, lastCost) + logger.Debugf("peer %s is bad because recent pieces have taken too long to download avg[%d] last[%d]", peer.PeerID, avgCost, lastCost) return true } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index 1f9875df1c1..1bba95aae96 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -116,7 +116,7 @@ func (m *monitor) printDebugInfo() string { } func (m *monitor) RefreshDownloadMonitor(peer *types.Peer) { - logger.Debugf("[%s][%s] downloadMonitorWorkingLoop refresh ", peer.Task.TaskID, peer.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") if !peer.IsRunning() { m.downloadMonitorQueue.AddAfter(peer, time.Second*2) } else if peer.IsWaiting() { @@ -141,7 +141,7 @@ func (m *monitor) downloadMonitorWorkingLoop() { //if m.downloadMonitorCallBack != nil { peer := v.(*types.Peer) if peer != nil { - logger.Debugf("[%s][%s] downloadMonitorWorkingLoop status[%d]", peer.Task.TaskID, peer.PeerID, peer.GetStatus()) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) if peer.IsSuccess() || peer.Host.CDN { // clear from monitor } else { diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index 4ccb14b5bb7..e2dbd84ddf2 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -71,12 +71,14 @@ type Scheduler struct { } func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) { - logger.Debugf("[%s][%s]scheduler children", peer.Task.TaskID, peer.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler children flow") if s.evaluator.IsBadNode(peer) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("peer is badNode") return } freeUpload := peer.Host.GetFreeUploadLoad() candidateChildren := s.selectCandidateChildren(peer, freeUpload*2) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d candidate children %v", len(candidateChildren), candidateChildren) evalResult := make(map[float64]*types.Peer) var evalScore []float64 for _, child := range candidateChildren { @@ -99,19 +101,21 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) for _, child := range children { child.ReplaceParent(peer) } + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("final schedule children list %v", children) return } func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) { - logger.Debugf("[%s][%s]scheduler parent", peer.Task.TaskID, peer.PeerID) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow") if !s.evaluator.NeedAdjustParent(peer) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, current parent is %v", peer.GetParent()) if peer.GetParent() == nil { return nil, nil, false } return peer.GetParent(), []*types.Peer{peer.GetParent()}, true } candidateParents := s.selectCandidateParents(peer, s.cfg.CandidateParentCount) - logger.Debugf("[%s][%s]select num %d candidates", peer.Task.TaskID, peer.PeerID, len(candidateParents)) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d parent candidates %v", len(candidateParents), candidateParents) var value float64 var primary = peer.GetParent() for _, candidate := range candidateParents { @@ -138,29 +142,50 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.Pick(peer.Task, limit, func(candidateNode *types.Peer) bool { - if candidateNode == nil || candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer { + if candidateNode == nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer is not selected because it is nil") + return false + } + if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %v", + candidateNode.PeerID, candidateNode) return false } if candidateNode.Host != nil && candidateNode.Host.CDN { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is not selected because it is a cdn host", candidateNode.PeerID) return false } if candidateNode.GetParent() == nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate child peer %s is selected because it has not parent", + candidateNode.PeerID) return true } if candidateNode.GetParent() != nil && s.evaluator.IsBadNode(candidateNode.GetParent()) { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID) return true } + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %v", + candidateNode.PeerID, candidateNode) return false }) } func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { - if candidateNode == nil || s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host. + if candidateNode == nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer is not selected because it is nil") + return false + } + if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host. GetFreeUploadLoad() <= 0 { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %v", + candidateNode.PeerID, candidateNode) return false } + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %v", + candidateNode.PeerID, candidateNode) return true }) }