Skip to content

Commit

Permalink
Restore implementations of Subjects to pre-pr state
Browse files Browse the repository at this point in the history
  • Loading branch information
isaac-weisberg committed Feb 25, 2025
1 parent e5dca1c commit 28c872d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 26 deletions.
8 changes: 4 additions & 4 deletions RxSwift/Subjects/BehaviorSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,21 @@ public final class BehaviorSubject<Element>
/// - parameter observer: Observer to subscribe to the subject.
/// - returns: Disposable object that can be used to unsubscribe the observer from the subject.
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
lock.lock()
self.lock.performLocked { self.synchronized_subscribe(observer) }
}

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if self.isDisposed {
lock.unlock()
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}

if let stoppedEvent = self.stoppedEvent {
lock.unlock()
observer.on(stoppedEvent)
return Disposables.create()
}

let key = self.observers.insert(observer.on)
lock.unlock()
observer.on(.next(self.element))

return SubscriptionDisposable(owner: self, key: key)
Expand Down
8 changes: 4 additions & 4 deletions RxSwift/Subjects/PublishSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,21 @@ public final class PublishSubject<Element>
- returns: Disposable object that can be used to unsubscribe the observer from the subject.
*/
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
lock.lock()
self.lock.performLocked { self.synchronized_subscribe(observer) }
}

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self.stoppedEvent {
lock.unlock()
observer.on(stoppedEvent)
return Disposables.create()
}

if self.isDisposed {
lock.unlock()
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}

let key = self.observers.insert(observer.on)
lock.unlock()
return SubscriptionDisposable(owner: self, key: key)
}

Expand Down
29 changes: 11 additions & 18 deletions RxSwift/Subjects/ReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private class ReplayBufferBase<Element>
rxAbstractMethod()
}

func getEventsToReplay() -> [Event<Element>] {
func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
rxAbstractMethod()
}

Expand Down Expand Up @@ -140,30 +140,24 @@ private class ReplayBufferBase<Element>
}

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
lock.lock()
self.lock.performLocked { self.synchronized_subscribe(observer) }
}

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if self.isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}

let anyObserver = observer.asObserver()

self.replayBuffer(anyObserver)
if let stoppedEvent = self.stoppedEvent {
let eventsToReplay = self.getEventsToReplay()
lock.unlock()
for event in eventsToReplay {
observer.on(event)
}
observer.on(stoppedEvent)
return Disposables.create()
}
else {
let key = self.observers.insert(observer.on)
let eventsToReplay = self.getEventsToReplay()
lock.unlock()
for event in eventsToReplay {
observer.on(event)
}
return SubscriptionDisposable(owner: self, key: key)
}
}
Expand Down Expand Up @@ -211,11 +205,10 @@ private final class ReplayOne<Element> : ReplayBufferBase<Element> {
self.value = value
}

override func getEventsToReplay() -> [Event<Element>] {
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
if let value = self.value {
return [.next(value)]
observer.on(.next(value))
}
return []
}

override func synchronized_dispose() {
Expand All @@ -235,9 +228,9 @@ private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
self.queue.enqueue(value)
}

override func getEventsToReplay() -> [Event<Element>] {
return queue.map { element in
Event.next(element)
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
for item in self.queue {
observer.on(.next(item))
}
}

Expand Down

0 comments on commit 28c872d

Please sign in to comment.