Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extra batch listener log and salud tweaks #4426

Merged
merged 5 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultPercentile)
saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down
1 change: 1 addition & 0 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
nextExpectedBatchBlock := (lastConfirmedBlock/batchFactor + 1) * batchFactor
remainingBlocks := nextExpectedBatchBlock - lastConfirmedBlock
expectedWaitTime = l.blockTime * time.Duration(remainingBlocks)
l.logger.Debug("sleeping until next block batch", "duration", expectedWaitTime)
} else {
expectedWaitTime = l.backoffTime
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}

const (
retrieveChunkTimeout = time.Second * 10
retrieveChunkTimeout = time.Second * 30
preemptiveInterval = time.Second
overDraftRefresh = time.Millisecond * 600
skiplistDur = time.Minute
Expand Down
33 changes: 18 additions & 15 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
const loggerName = "salud"

const (
wakeup = time.Minute
requestTimeout = time.Second * 10
DefaultMinPeersPerBin = 4
DefaultPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter health/performance check
wakeup = time.Minute * 5
requestTimeout = time.Second * 10
DefaultMinPeersPerBin = 4
DefaultDurPercentile = 0.4 // consider 40% as healthy, lower percentile = stricter duration check
DefaultConnsPercentile = 0.8 // consider 80% as healthy, lower percentile = stricter conns check
)

type topologyDriver interface {
Expand Down Expand Up @@ -66,7 +67,8 @@ func New(
warmup time.Duration,
mode string,
minPeersPerbin int,
percentile float64,
durPercentile float64,
connsPercentile float64,
) *service {

metrics := newMetrics()
Expand All @@ -82,13 +84,13 @@ func New(
}

s.wg.Add(1)
go s.worker(warmup, mode, minPeersPerbin, percentile)
go s.worker(warmup, mode, minPeersPerbin, durPercentile, connsPercentile)

return s

}

func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, percentile float64) {
func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {
defer s.wg.Done()

select {
Expand All @@ -99,7 +101,7 @@ func (s *service) worker(warmup time.Duration, mode string, minPeersPerbin int,

for {

s.salud(mode, minPeersPerbin, percentile)
s.salud(mode, minPeersPerbin, durPercentile, connsPercentile)

select {
case <-s.quit:
Expand All @@ -126,7 +128,7 @@ type peer struct {
// salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected
// per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond
// the allowed thresholds.
func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {
func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, connsPercentile float64) {

var (
mtx sync.Mutex
Expand Down Expand Up @@ -174,8 +176,8 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {

networkRadius, nHoodRadius := s.radius(peers)
avgDur := totaldur / float64(len(peers))
pDur := percentileDur(peers, percentile)
pConns := percentileConns(peers, percentile)
pDur := percentileDur(peers, durPercentile)
pConns := percentileConns(peers, connsPercentile)
commitment := commitment(peers)

s.metrics.AvgDur.Set(avgDur)
Expand All @@ -185,26 +187,27 @@ func (s *service) salud(mode string, minPeersPerbin int, percentile float64) {
s.metrics.NeighborhoodRadius.Set(float64(nHoodRadius))
s.metrics.Commitment.Set(float64(commitment))

s.logger.Debug("computed", "average", avgDur, "percentile", percentile, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)
s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)

for _, peer := range peers {

var healthy bool

// every bin should have at least some peers, healthy or not
if bins[peer.bin] <= minPeersPerbin {
s.metrics.Healthy.Inc()
s.topology.UpdatePeerHealth(peer.addr, true, peer.dur)
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("dur health failure", "dur", peer.dur, "peer_address", peer.addr)
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
} else if peer.status.ConnectedPeers < pConns {
s.logger.Debug("connections health failure", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr)
s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr)
} else if peer.status.BatchCommitment != commitment {
s.logger.Debug("batch commitment health failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr)
s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr)
} else {
healthy = true
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSalud(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
mockstorer.WithReserveSize(100),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
unsub()
Expand Down
Loading