Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock-free disposal. #107

Merged
merged 4 commits into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions Sources/Atomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,123 @@
//

import Foundation
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import MachO
#endif

/// Represents a finite state machine that can transit from one state to
/// another.
internal protocol AtomicStateProtocol {
associatedtype State: RawRepresentable

/// Try to transit from the expected current state to the specified next
/// state.
///
/// - parameters:
/// - expected: The expected state.
///
/// - returns:
/// `true` if the transition succeeds. `false` otherwise.
func tryTransiting(from expected: State, to next: State) -> Bool
}

/// A simple, generic lock-free finite state machine.
///
/// - warning: `deinitialize` must be called to dispose of the consumed memory.
internal struct UnsafeAtomicState<State: RawRepresentable>: AtomicStateProtocol where State.RawValue == Int32 {
internal typealias Transition = (expected: State, next: State)
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
private let value: UnsafeMutablePointer<Int32>

/// Create a finite state machine with the specified initial state.
///
/// - parameters:
/// - initial: The desired initial state.
internal init(_ initial: State) {
value = UnsafeMutablePointer<Int32>.allocate(capacity: 1)
value.initialize(to: initial.rawValue)
}

/// Deinitialize the finite state machine.
internal func deinitialize() {
value.deinitialize()
value.deallocate(capacity: 1)
}

/// Compare the current state with the specified state.
///
/// - parameters:
/// - expected: The expected state.
///
/// - returns:
/// `true` if the current state matches the expected state. `false`
/// otherwise.
@inline(__always)
internal func `is`(_ expected: State) -> Bool {
return OSAtomicCompareAndSwap32Barrier(expected.rawValue,
expected.rawValue,
value)
}

/// Try to transit from the expected current state to the specified next
/// state.
///
/// - parameters:
/// - expected: The expected state.
///
/// - returns:
/// `true` if the transition succeeds. `false` otherwise.
@inline(__always)
internal func tryTransiting(from expected: State, to next: State) -> Bool {
return OSAtomicCompareAndSwap32Barrier(expected.rawValue,
next.rawValue,
value)
}
#else
private let value: Atomic<Int32>

/// Create a finite state machine with the specified initial state.
///
/// - parameters:
/// - initial: The desired initial state.
internal init(_ initial: State) {
value = Atomic(initial.rawValue)
}

/// Deinitialize the finite state machine.
internal func deinitialize() {}

/// Compare the current state with the specified state.
///
/// - parameters:
/// - expected: The expected state.
///
/// - returns:
/// `true` if the current state matches the expected state. `false`
/// otherwise.
internal func `is`(_ expected: State) -> Bool {
return value.modify { $0 == expected.rawValue }
}

/// Try to transit from the expected current state to the specified next
/// state.
///
/// - parameters:
/// - expected: The expected state.
///
/// - returns:
/// `true` if the transition succeeds. `false` otherwise.
internal func tryTransiting(from expected: State, to next: State) -> Bool {
return value.modify { value in
if value == expected.rawValue {
value = next.rawValue
return true
}
return false
}
}
#endif
}

final class PosixThreadMutex: NSLocking {
private var mutex = pthread_mutex_t()
Expand Down
119 changes: 82 additions & 37 deletions Sources/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,33 @@ public protocol Disposable: class {
/// Whether this disposable has been disposed already.
var isDisposed: Bool { get }

/// Method for disposing of resources when appropriate.
/// Disposing of the resources represented by `self`. If `self` has already
/// been disposed of, it does nothing.
///
/// - note: Implementations must issue a memory barrier.
func dispose()
}

/// Represents the state of a disposable.
private enum DisposableState: Int32 {
/// The disposable is active.
case active

/// The disposable has been disposed.
case disposed
}

extension AtomicStateProtocol where State == DisposableState {
/// Try to transit from `active` to `disposed`.
///
/// - returns:
/// `true` if the transition succeeds. `false` otherwise.
@inline(__always)
fileprivate func tryDispose() -> Bool {
return tryTransiting(from: .active, to: .disposed)
}
}

/// A type-erased disposable that forwards operations to an underlying disposable.
public final class AnyDisposable: Disposable {
private let disposable: Disposable
Expand All @@ -36,59 +59,75 @@ public final class AnyDisposable: Disposable {
/// A disposable that only flips `isDisposed` upon disposal, and performs no other
/// work.
public final class SimpleDisposable: Disposable {
private let _isDisposed = Atomic(false)
private var state = UnsafeAtomicState(DisposableState.active)

public var isDisposed: Bool {
return _isDisposed.value
return state.is(.disposed)
}

public init() {}

public func dispose() {
_isDisposed.value = true
_ = state.tryDispose()
}

deinit {
state.deinitialize()
}
}

/// A disposable that will run an action upon disposal.
public final class ActionDisposable: Disposable {
private let action: Atomic<(() -> Void)?>
private var action: (() -> Void)?
private var state: UnsafeAtomicState<DisposableState>

public var isDisposed: Bool {
return action.value == nil
return state.is(.disposed)
}

/// Initialize the disposable to run the given action upon disposal.
///
/// - parameters:
/// - action: A closure to run when calling `dispose()`.
public init(action: @escaping () -> Void) {
self.action = Atomic(action)
self.action = action
self.state = UnsafeAtomicState(DisposableState.active)
}

public func dispose() {
let oldAction = action.swap(nil)
oldAction?()
if state.tryDispose() {
action?()
action = nil
}
}

deinit {
state.deinitialize()
}
}

/// A disposable that will dispose of any number of other disposables.
public final class CompositeDisposable: Disposable {
private let disposables: Atomic<Bag<Disposable>?>
private var state: UnsafeAtomicState<DisposableState>

/// Represents a handle to a disposable previously added to a
/// CompositeDisposable.
public final class DisposableHandle {
private let bagToken: Atomic<RemovalToken?>
private var state: UnsafeAtomicState<DisposableState>
private var bagToken: RemovalToken?
private weak var disposable: CompositeDisposable?

fileprivate static let empty = DisposableHandle()

fileprivate init() {
self.bagToken = Atomic(nil)
self.state = UnsafeAtomicState(.disposed)
self.bagToken = nil
}

fileprivate init(bagToken: RemovalToken, disposable: CompositeDisposable) {
self.bagToken = Atomic(bagToken)
self.state = UnsafeAtomicState(.active)
self.bagToken = bagToken
self.disposable = disposable
}

Expand All @@ -97,16 +136,18 @@ public final class CompositeDisposable: Disposable {
/// - note: This is useful to minimize memory growth, by removing
/// disposables that are no longer needed.
public func remove() {
if let token = bagToken.swap(nil) {
if state.tryDispose(), let token = bagToken {
_ = disposable?.disposables.modify {
$0?.remove(using: token)
}
bagToken = nil
disposable = nil
}
}
}

public var isDisposed: Bool {
return disposables.value == nil
return state.is(.disposed)
}

/// Initialize a `CompositeDisposable` containing the given sequence of
Expand All @@ -125,6 +166,7 @@ public final class CompositeDisposable: Disposable {
}

self.disposables = Atomic(bag)
self.state = UnsafeAtomicState(DisposableState.active)
}

/// Initialize a `CompositeDisposable` containing the given sequence of
Expand All @@ -145,9 +187,11 @@ public final class CompositeDisposable: Disposable {
}

public func dispose() {
if let ds = disposables.swap(nil) {
for d in ds.reversed() {
d.dispose()
if state.tryDispose() {
if let ds = disposables.swap(nil) {
for d in ds {
d.dispose()
}
}
}
}
Expand Down Expand Up @@ -189,6 +233,10 @@ public final class CompositeDisposable: Disposable {
public func add(_ action: @escaping () -> Void) -> DisposableHandle {
return add(ActionDisposable(action: action))
}

deinit {
state.deinitialize()
}
}

/// A disposable that, upon deinitialization, will automatically dispose of
Expand Down Expand Up @@ -216,7 +264,7 @@ public final class ScopedDisposable<InnerDisposable: Disposable>: Disposable {
}

public func dispose() {
innerDisposable.dispose()
return innerDisposable.dispose()
}
}

Expand All @@ -234,15 +282,11 @@ extension ScopedDisposable where InnerDisposable: AnyDisposable {

/// A disposable that will optionally dispose of another disposable.
public final class SerialDisposable: Disposable {
private struct State {
var innerDisposable: Disposable? = nil
var isDisposed = false
}

private let state = Atomic(State())
private let _innerDisposable: Atomic<Disposable?>
private var state: UnsafeAtomicState<DisposableState>

public var isDisposed: Bool {
return state.value.isDisposed
return state.is(.disposed)
}

/// The inner disposable to dispose of.
Expand All @@ -251,18 +295,13 @@ public final class SerialDisposable: Disposable {
/// disposable is automatically disposed.
public var innerDisposable: Disposable? {
get {
return state.value.innerDisposable
return _innerDisposable.value
}

set(d) {
let oldState: State = state.modify { state in
defer { state.innerDisposable = d }
return state
}

oldState.innerDisposable?.dispose()
if oldState.isDisposed {
d?.dispose()
_innerDisposable.swap(d)?.dispose()
if let d = d, isDisposed {
d.dispose()
}
}
}
Expand All @@ -273,12 +312,18 @@ public final class SerialDisposable: Disposable {
/// - parameters:
/// - disposable: Optional disposable.
public init(_ disposable: Disposable? = nil) {
innerDisposable = disposable
self._innerDisposable = Atomic(disposable)
self.state = UnsafeAtomicState(DisposableState.active)
}

public func dispose() {
let orig = state.swap(State(innerDisposable: nil, isDisposed: true))
orig.innerDisposable?.dispose()
if state.tryDispose() {
_innerDisposable.swap(nil)?.dispose()
}
}

deinit {
state.deinitialize()
}
}

Expand Down