Skip to content

Commit

Permalink
feat: fixed cpu hotloop in dialwrap (#406)
Browse files Browse the repository at this point in the history
* feat: fixed cpu hotloop in dialwrap

* fixed data race
  • Loading branch information
Ice3man543 authored Jan 21, 2025
1 parent 840efa6 commit f8e8673
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions fastdialer/utils/dialwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type DialWrap struct {
mu sync.RWMutex
// error returned by first connection
err error

firstConnCond *sync.Cond
}

// NewDialWrap creates a new dial wrap instance and returns it.
Expand Down Expand Up @@ -98,6 +100,7 @@ func NewDialWrap(dialer *net.Dialer, ips []string, network, address, port string
network: network,
address: address,
port: port,
firstConnCond: sync.NewCond(&sync.Mutex{}),
}, nil
}

Expand Down Expand Up @@ -128,8 +131,12 @@ func (d *DialWrap) DialContext(ctx context.Context, _ string, _ string) (net.Con
case <-d.hasCompletedFirstConnection(ctx):
// if first connection completed and it failed due to other reasons
// and not due to context cancellation
if d.err != nil && !errkit.Is(d.err, ErrInflightCancel) && !errkit.Is(d.err, context.Canceled) {
return nil, d.err
d.firstConnCond.L.Lock()
err := d.err
d.firstConnCond.L.Unlock()

if err != nil && !errkit.Is(err, ErrInflightCancel) && !errkit.Is(err, context.Canceled) {
return nil, err
}
return d.dial(ctx)
case <-ctx.Done():
Expand All @@ -143,27 +150,31 @@ func (d *DialWrap) doFirstConnection(ctx context.Context) chan *dialResult {
}
d.busyFirstConnection.Store(true)
now := time.Now()
defer func() {
d.SetFirstConnectionDuration(time.Since(now))
}()

size := len(d.ipv4) + len(d.ipv6)
ch := make(chan *dialResult, size)

// dial parallel
conns, err := d.dialAllParallel(ctx)
defer func() {
go func() {
defer close(ch)

conns, err := d.dialAllParallel(ctx)

d.firstConnCond.L.Lock()
d.SetFirstConnectionDuration(time.Since(now))
d.completedFirstConnection.Store(true)
close(ch)
}()
if err != nil {
d.firstConnCond.Broadcast()
d.err = err
ch <- &dialResult{error: err}
return ch
}
for _, conn := range conns {
ch <- conn
}
d.firstConnCond.L.Unlock()

if err != nil {
ch <- &dialResult{error: err}
return
}
for _, conn := range conns {
ch <- conn
}
}()
return ch
}

Expand All @@ -172,19 +183,22 @@ func (d *DialWrap) hasCompletedFirstConnection(ctx context.Context) chan struct{

go func() {
defer close(ch)
for {
if d.completedFirstConnection.Load() {
ch <- struct{}{}
return
}
select {
case <-ctx.Done():

// Check immediately first
if d.completedFirstConnection.Load() {
return
}

d.firstConnCond.L.Lock()
defer d.firstConnCond.L.Unlock()

for !d.completedFirstConnection.Load() {
if ctx.Err() != nil {
return
default:
}
d.firstConnCond.Wait()
}
}()

return ch
}

Expand Down

0 comments on commit f8e8673

Please sign in to comment.