Skip to content

Commit

Permalink
Merge pull request #760 from ipfs/marco/drop-stream-reference
Browse files Browse the repository at this point in the history
[skip changelog] fix: Drop stream references on Close/Reset
  • Loading branch information
gammazero authored Dec 18, 2024
2 parents 2a5c7d0 + af8522d commit 3a844a9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The following emojis are used to highlight certain changes:
- 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`.
- Replaced unmaintained mock time implementation uses in tests: [from](github.com/benbjohnson/clock) => [to](github.com/filecoin-project/go-clock)
- updated to go-libp2p to [v0.38.0](/~https://github.com/libp2p/go-libp2p/releases/tag/v0.38.0)
- `bitswap/client`: if a libp2p connection has a context, use `context.AfterFunc` to cleanup the connection.


### Removed
Expand Down
72 changes: 57 additions & 15 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,45 @@ type impl struct {
receivers []Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
// atomic.Value needs the same type and can not Store(nil). This indirection
// allows us to store nil.
type interfaceWrapper[T any] struct {
t T
}
type atomicInterface[T any] struct {
iface atomic.Value
}

func (a *atomicInterface[T]) Load() T {
var v T
x := a.iface.Load()
if x != nil {
return x.(interfaceWrapper[T]).t
}
return v
}

func (a *atomicInterface[T]) Store(v T) {
a.iface.Store(interfaceWrapper[T]{v})
}

type streamMessageSender struct {
to peer.ID
stream network.Stream
connected bool
bsnet *impl
opts *MessageSenderOpts
to peer.ID
stream atomicInterface[network.Stream]
bsnet *impl
opts *MessageSenderOpts
}

type HasContext interface {
Context() context.Context
}

// Open a stream to the remote peer
func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) {
if s.connected {
return s.stream, nil
stream := s.stream.Load()
if stream != nil {
return stream, nil
}

tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
Expand All @@ -107,30 +134,45 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro
if err != nil {
return nil, err
}
if withCtx, ok := stream.Conn().(HasContext); ok {
context.AfterFunc(withCtx.Context(), func() {
s.stream.Store(nil)
})
}

s.stream = stream
s.connected = true
return s.stream, nil
s.stream.Store(stream)
return stream, nil
}

// Reset the stream
func (s *streamMessageSender) Reset() error {
if s.stream != nil {
err := s.stream.Reset()
s.connected = false
stream := s.stream.Load()
if stream != nil {
err := stream.Reset()
s.stream.Store(nil)
return err
}
return nil
}

// Close the stream
func (s *streamMessageSender) Close() error {
return s.stream.Close()
stream := s.stream.Load()
if stream != nil {
err := stream.Close()
s.stream.Store(nil)
return err
}
return nil
}

// Indicates whether the peer supports HAVE / DONT_HAVE messages
func (s *streamMessageSender) SupportsHave() bool {
return s.bsnet.SupportsHave(s.stream.Protocol())
stream := s.stream.Load()
if stream == nil {
return false
}
return s.bsnet.SupportsHave(stream.Protocol())
}

// Send a message to the peer, attempting multiple times
Expand Down

0 comments on commit 3a844a9

Please sign in to comment.