diff --git a/pkg/node/node.go b/pkg/node/node.go index 1fe13330bd2..3abfeb264e8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -929,7 +929,7 @@ func NewBee( return nil, fmt.Errorf("status service: %w", err) } - saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultPercentile) + saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile) b.saludCloser = saludService rC, unsub := saludService.SubscribeNetworkStorageRadius() diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index 687fa9b9f50..49cc36696e8 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -259,6 +259,7 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even nextExpectedBatchBlock := (lastConfirmedBlock/batchFactor + 1) * batchFactor remainingBlocks := nextExpectedBatchBlock - lastConfirmedBlock expectedWaitTime = l.blockTime * time.Duration(remainingBlocks) + l.logger.Debug("sleeping until next block batch", "duration", expectedWaitTime) } else { expectedWaitTime = l.backoffTime } diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 22564ff5745..551f08e6310 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -120,7 +120,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { } const ( - retrieveChunkTimeout = time.Second * 10 + retrieveChunkTimeout = time.Second * 30 preemptiveInterval = time.Second overDraftRefresh = time.Millisecond * 600 skiplistDur = time.Minute diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d2f491c2c3c..d9a661c40af 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -24,10 +24,11 @@ import ( const loggerName = "salud" const ( - wakeup = time.Minute - requestTimeout = time.Second * 10 - DefaultMinPeersPerBin = 4 - DefaultPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter health/performance check + wakeup = time.Minute * 5 + requestTimeout = time.Second * 10 + DefaultMinPeersPerBin = 4 + DefaultDurPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter duration check + DefaultConnsPercentile = 0.8 // consider 80% as healthy, lower percentile = stricter conns check ) type topologyDriver interface { @@ -66,7 +67,8 @@ func New( warmup time.Duration, mode string, minPeersPerbin int, - percentile float64, + durPercentile float64, + connsPercentile float64, ) *service { metrics := newMetrics() @@ -82,13 +84,13 @@ func New( } s.wg.Add(1) - go s.worker(warmup, mode, minPeersPerbin, percentile) + go s.worker(warmup, mode, minPeersPerbin, durPercentile, connsPercentile) return s } -func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, percentile float64) { +func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) { defer s.wg.Done() select { @@ -99,7 +101,7 @@ func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, for { - s.salud(mode, minPeersPerbin, percentile) + s.salud(mode, minPeersPerbin, durPercentile, connsPercentile) select { case <-s.quit: @@ -126,7 +128,7 @@ type peer struct { // salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected // per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond // the allowed thresholds. -func (s *service) salud(mode string, minPeersPerbin int, percentile float64) { +func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) { var ( mtx sync.Mutex @@ -174,8 +176,8 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) { networkRadius, nHoodRadius := s.radius(peers) avgDur := totaldur / float64(len(peers)) - pDur := percentileDur(peers, percentile) - pConns := percentileConns(peers, percentile) + pDur := percentileDur(peers, durPercentile) + pConns := percentileConns(peers, connsPercentile) commitment := commitment(peers) s.metrics.AvgDur.Set(avgDur) @@ -185,7 +187,7 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) { s.metrics.NeighborhoodRadius.Set(float64(nHoodRadius)) s.metrics.Commitment.Set(float64(commitment)) - s.logger.Debug("computed", "average", avgDur, "percentile", percentile, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) + s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) for _, peer := range peers { @@ -193,6 +195,7 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) { // every bin should have at least some peers, healthy or not if bins[peer.bin] <= minPeersPerbin { + s.metrics.Healthy.Inc() s.topology.UpdatePeerHealth(peer.addr, true, peer.dur) continue } @@ -200,11 +203,11 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) { if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) { s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr) } else if peer.dur.Seconds() > pDur { - s.logger.Debug("dur health failure", "dur", peer.dur, "peer_address", peer.addr) + s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr) } else if peer.status.ConnectedPeers < pConns { - s.logger.Debug("connections health failure", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr) + s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr) } else if peer.status.BatchCommitment != commitment { - s.logger.Debug("batch commitment health failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr) + s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr) } else { healthy = true } diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index a448c3c3a08..58a5ae38bb6 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -69,7 +69,7 @@ func TestSalud(t *testing.T) { mockstorer.WithReserveSize(100), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -115,7 +115,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { mockstorer.WithReserveSize(100), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -148,7 +148,7 @@ func TestSubToRadius(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) c, unsub := service.SubscribeNetworkStorageRadius() t.Cleanup(unsub) @@ -181,7 +181,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) c, unsub := service.SubscribeNetworkStorageRadius() unsub()