Skip to content

Commit

Permalink
relay-check-parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Sep 17, 2022
1 parent 4d3941c commit 4e70408
Showing 1 changed file with 27 additions and 66 deletions.
93 changes: 27 additions & 66 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,43 +200,7 @@ func (m *BoostService) handleRoot(w http.ResponseWriter, req *http.Request) {
// handleStatus sends calls to the status endpoint of every relay.
// It returns OK if at least one returned OK, and returns error otherwise.
func (m *BoostService) handleStatus(w http.ResponseWriter, req *http.Request) {
if !m.relayCheck {
m.respondOK(w, nilResponse)
return
}

// If relayCheck is enabled, make sure at least 1 relay returns success
var wg sync.WaitGroup
var numSuccessRequestsToRelay uint32
ua := UserAgent(req.Header.Get("User-Agent"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, r := range m.relays {
wg.Add(1)

go func(relay RelayEntry) {
defer wg.Done()
url := relay.GetURI(pathStatus)
log := m.log.WithField("url", url)
log.Debug("Checking relay status")

_, err := SendHTTPRequest(ctx, m.httpClientGetHeader, http.MethodGet, url, ua, nil, nil)
if err != nil && ctx.Err() != context.Canceled {
log.WithError(err).Error("failed to retrieve relay status")
return
}

// Success: increase counter and cancel all pending requests to other relays
atomic.AddUint32(&numSuccessRequestsToRelay, 1)
cancel()
}(r)
}

// At the end, wait for every routine and return status according to relay's ones.
wg.Wait()

if numSuccessRequestsToRelay > 0 {
if !m.relayCheck || m.CheckRelays() > 0 {
m.respondOK(w, nilResponse)
} else {
m.respondError(w, http.StatusServiceUnavailable, "all relays are unavailable")
Expand Down Expand Up @@ -555,41 +519,38 @@ func (m *BoostService) handleGetPayload(w http.ResponseWriter, req *http.Request

// CheckRelays sends a request to each one of the relays previously registered to get their status
func (m *BoostService) CheckRelays() int {
wg := sync.WaitGroup{}
wg.Add(len(m.relays) + 1)

retChan := make(chan bool, len(m.relays))
defer close(retChan)
var wg sync.WaitGroup
var numSuccessRequestsToRelay uint32

for _, relay := range m.relays {
go func(c chan<- bool, relay RelayEntry) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m.log.WithField("relay", relay.String()).Info("Checking relay")
for _, r := range m.relays {
wg.Add(1)

go func(relay RelayEntry) {
defer wg.Done()
url := relay.GetURI(pathStatus)
_, err := SendHTTPRequest(context.Background(), m.httpClientGetHeader, http.MethodGet, url, "", nil, nil)
if err != nil {
m.log.WithError(err).WithField("relay", relay.String()).Error("relay check failed")
c <- false
} else {
c <- true
}
}(retChan, relay)
}

numHealthyRelays := 0
go func(c <-chan bool) {
defer wg.Done()
log := m.log.WithField("url", url)
log.Debug("Checking relay status")

for i := 0; i < len(m.relays); i++ {
if ret := <-c; ret {
numHealthyRelays++
code, err := SendHTTPRequest(ctx, m.httpClientGetHeader, http.MethodGet, url, "", nil, nil)
if err != nil && ctx.Err() != context.Canceled {
log.WithError(err).Error("relay status error - request failed")
return
}
if code != http.StatusOK {
log.Errorf("relay status error - unexpected status code %d", code)
return
}
}
}(retChan)

wg.Wait()
// Success: increase counter and cancel all pending requests to other relays
atomic.AddUint32(&numSuccessRequestsToRelay, 1)
cancel()
}(r)
}

return numHealthyRelays
// At the end, wait for every routine and return status according to relay's ones.
wg.Wait()
return int(numSuccessRequestsToRelay)
}

0 comments on commit 4e70408

Please sign in to comment.