Skip to content

Commit

Permalink
relay-check-parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Sep 17, 2022
1 parent 2668a7a commit bf7caa7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 70 deletions.
93 changes: 27 additions & 66 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,43 +200,7 @@ func (m *BoostService) handleRoot(w http.ResponseWriter, req *http.Request) {
// handleStatus sends calls to the status endpoint of every relay.
// It returns OK if at least one returned OK, and returns error otherwise.
func (m *BoostService) handleStatus(w http.ResponseWriter, req *http.Request) {
if !m.relayCheck {
m.respondOK(w, nilResponse)
return
}

// If relayCheck is enabled, make sure at least 1 relay returns success
var wg sync.WaitGroup
var numSuccessRequestsToRelay uint32
ua := UserAgent(req.Header.Get("User-Agent"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, r := range m.relays {
wg.Add(1)

go func(relay RelayEntry) {
defer wg.Done()
url := relay.GetURI(pathStatus)
log := m.log.WithField("url", url)
log.Debug("Checking relay status")

_, err := SendHTTPRequest(ctx, m.httpClientGetHeader, http.MethodGet, url, ua, nil, nil)
if err != nil && ctx.Err() != context.Canceled {
log.WithError(err).Error("failed to retrieve relay status")
return
}

// Success: increase counter and cancel all pending requests to other relays
atomic.AddUint32(&numSuccessRequestsToRelay, 1)
cancel()
}(r)
}

// At the end, wait for every routine and return status according to relay's ones.
wg.Wait()

if numSuccessRequestsToRelay > 0 {
if !m.relayCheck || m.CheckRelays() > 0 {
m.respondOK(w, nilResponse)
} else {
m.respondError(w, http.StatusServiceUnavailable, "all relays are unavailable")
Expand Down Expand Up @@ -555,41 +519,38 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request

// CheckRelays sends a request to each one of the relays previously registered to get their status
func (m *BoostService) CheckRelays() int {
wg := sync.WaitGroup{}
wg.Add(len(m.relays) + 1)

retChan := make(chan bool, len(m.relays))
defer close(retChan)
var wg sync.WaitGroup
var numSuccessRequestsToRelay uint32

for _, relay := range m.relays {
go func(c chan<- bool, relay RelayEntry) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.log.WithField("relay", relay.String()).Info("Checking relay")
for _, r := range m.relays {
wg.Add(1)

go func(relay RelayEntry) {
defer wg.Done()
url := relay.GetURI(pathStatus)
_, err := SendHTTPRequest(context.Background(), m.httpClientGetHeader, http.MethodGet, url, "", nil, nil)
if err != nil {
m.log.WithError(err).WithField("relay", relay.String()).Error("relay check failed")
c <- false
} else {
c <- true
}
}(retChan, relay)
}

numHealthyRelays := 0
go func(c <-chan bool) {
defer wg.Done()
log := m.log.WithField("url", url)
log.Debug("Checking relay status")

for i := 0; i < len(m.relays); i++ {
if ret := <-c; ret {
numHealthyRelays++
code, err := SendHTTPRequest(ctx, m.httpClientGetHeader, http.MethodGet, url, "", nil, nil)
if err != nil && ctx.Err() != context.Canceled {
log.WithError(err).Error("relay status error - request failed")
return
}
if code != http.StatusOK {
log.Errorf("relay status error - unexpected status code %d", code)
return
}
}
}(retChan)

wg.Wait()
// Success: increase counter and cancel all pending requests to other relays
atomic.AddUint32(&numSuccessRequestsToRelay, 1)
cancel()
}(r)
}

return numHealthyRelays
// At the end, wait for every routine and return status according to relay's ones.
wg.Wait()
return int(numSuccessRequestsToRelay)
}
16 changes: 12 additions & 4 deletions server/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,20 +499,28 @@ func TestGetPayload(t *testing.T) {
}

func TestCheckRelays(t *testing.T) {
t.Run("At least one relay is okay", func(t *testing.T) {
backend := newTestBackend(t, 3, time.Second)
t.Run("One relay is okay", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
numHealthyRelays := backend.boost.CheckRelays()
require.Equal(t, 3, numHealthyRelays)
require.Equal(t, 1, numHealthyRelays)
})

t.Run("Every relays are down", func(t *testing.T) {
t.Run("One relay is down", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
backend.relays[0].Server.Close()

numHealthyRelays := backend.boost.CheckRelays()
require.Equal(t, 0, numHealthyRelays)
})

t.Run("One relays is up, one down", func(t *testing.T) {
backend := newTestBackend(t, 2, time.Second)
backend.relays[0].Server.Close()

numHealthyRelays := backend.boost.CheckRelays()
require.Equal(t, 1, numHealthyRelays)
})

t.Run("Should not follow redirects", func(t *testing.T) {
backend := newTestBackend(t, 1, time.Second)
redirectAddress := backend.relays[0].Server.URL
Expand Down

0 comments on commit bf7caa7

Please sign in to comment.