Skip to content

Commit

Permalink
chore: extra batch listener log and salud tweaks (#4426)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 26, 2023
1 parent 58ef6f3 commit 4a424d5
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultPercentile)
saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down
1 change: 1 addition & 0 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
nextExpectedBatchBlock := (lastConfirmedBlock/batchFactor + 1) * batchFactor
remainingBlocks := nextExpectedBatchBlock - lastConfirmedBlock
expectedWaitTime = l.blockTime * time.Duration(remainingBlocks)
l.logger.Debug("sleeping until next block batch", "duration", expectedWaitTime)
} else {
expectedWaitTime = l.backoffTime
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}

const (
retrieveChunkTimeout = time.Second * 10
retrieveChunkTimeout = time.Second * 30
preemptiveInterval = time.Second
overDraftRefresh = time.Millisecond * 600
skiplistDur = time.Minute
Expand Down
33 changes: 18 additions & 15 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
const loggerName = "salud"

const (
wakeup = time.Minute
requestTimeout = time.Second * 10
DefaultMinPeersPerBin = 4
DefaultPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter health/performance check
wakeup = time.Minute * 5
requestTimeout = time.Second * 10
DefaultMinPeersPerBin = 4
DefaultDurPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter duration check
DefaultConnsPercentile = 0.8 // consider 80% as healthy, lower percentile = stricter conns check
)

type topologyDriver interface {
Expand Down Expand Up @@ -66,7 +67,8 @@ func New(
warmup time.Duration,
mode string,
minPeersPerbin int,
percentile float64,
durPercentile float64,
connsPercentile float64,
) *service {

metrics := newMetrics()
Expand All @@ -82,13 +84,13 @@ func New(
}

s.wg.Add(1)
go s.worker(warmup, mode, minPeersPerbin, percentile)
go s.worker(warmup, mode, minPeersPerbin, durPercentile, connsPercentile)

return s

}

func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, percentile float64) {
func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {
defer s.wg.Done()

select {
Expand All @@ -99,7 +101,7 @@ func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int,

for {

s.salud(mode, minPeersPerbin, percentile)
s.salud(mode, minPeersPerbin, durPercentile, connsPercentile)

select {
case <-s.quit:
Expand All @@ -126,7 +128,7 @@ type peer struct {
// salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected
// per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond
// the allowed thresholds.
func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {
func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {

var (
mtx sync.Mutex
Expand Down Expand Up @@ -174,8 +176,8 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {

networkRadius, nHoodRadius := s.radius(peers)
avgDur := totaldur / float64(len(peers))
pDur := percentileDur(peers, percentile)
pConns := percentileConns(peers, percentile)
pDur := percentileDur(peers, durPercentile)
pConns := percentileConns(peers, connsPercentile)
commitment := commitment(peers)

s.metrics.AvgDur.Set(avgDur)
Expand All @@ -185,26 +187,27 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {
s.metrics.NeighborhoodRadius.Set(float64(nHoodRadius))
s.metrics.Commitment.Set(float64(commitment))

s.logger.Debug("computed", "average", avgDur, "percentile", percentile, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)
s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)

for _, peer := range peers {

var healthy bool

// every bin should have at least some peers, healthy or not
if bins[peer.bin] <= minPeersPerbin {
s.metrics.Healthy.Inc()
s.topology.UpdatePeerHealth(peer.addr, true, peer.dur)
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("dur health failure", "dur", peer.dur, "peer_address", peer.addr)
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
} else if peer.status.ConnectedPeers < pConns {
s.logger.Debug("connections health failure", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr)
s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr)
} else if peer.status.BatchCommitment != commitment {
s.logger.Debug("batch commitment health failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr)
s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr)
} else {
healthy = true
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSalud(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
unsub()
Expand Down

0 comments on commit 4a424d5

Please sign in to comment.