diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 5cb5ebd7ed..23e666896d 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -17965,6 +17965,11 @@ public final Flowable> window( * Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data @@ -17995,6 +18000,11 @@ public final Flowable> window(Publisher boundaryIndicator) { * Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data @@ -18031,6 +18041,11 @@ public final Flowable> window(Publisher boundaryIndicator, in * {@code closingSelector} emits an item. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The outer Publisher of this operator doesn't support backpressure because the emission of new @@ -18068,6 +18083,11 @@ public final Flowable> window( * {@code closingSelector} emits an item. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The outer Publisher of this operator doesn't support backpressure because the emission of new diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 505513c160..ce0815477f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -14536,6 +14536,11 @@ public final Observable> window(long count, long skip, int bufferS * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14564,6 +14569,11 @@ public final Observable> window(long timespan, long timeskip, Time * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14594,6 +14604,11 @@ public final Observable> window(long timespan, long timeskip, Time * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14630,6 +14645,11 @@ public final Observable> window(long timespan, long timeskip, Time * ObservableSource emits the current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14658,6 +14678,11 @@ public final Observable> window(long timespan, TimeUnit unit) { * emits the current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14690,6 +14715,11 @@ public final Observable> window(long timespan, TimeUnit unit, * emits the current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14723,6 +14753,11 @@ public final Observable> window(long timespan, TimeUnit unit, * ObservableSource emits the current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14754,6 +14789,11 @@ public final Observable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14788,6 +14828,11 @@ public final Observable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14824,6 +14869,11 @@ public final Observable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -14865,6 +14915,11 @@ public final Observable> window( * ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14891,6 +14946,11 @@ public final Observable> window(ObservableSource boundary) * ObservableSource. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14922,6 +14982,11 @@ public final Observable> window(ObservableSource boundary, * {@code closingIndicator} emits an item. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14953,6 +15018,11 @@ public final Observable> window( * {@code closingIndicator} emits an item. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java index ba43da6d84..2e07d3eec8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java @@ -240,7 +240,11 @@ void drain() { if (emitted != requested.get()) { emitted++; - downstream.onNext(w); + FlowableWindowSubscribeIntercept intercept = new FlowableWindowSubscribeIntercept(w); + downstream.onNext(intercept); + if (intercept.tryAbandon()) { + w.onComplete(); + } } else { SubscriptionHelper.cancel(upstream); boundarySubscriber.dispose(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java index 573f0257e9..63ee8ccf3c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -18,70 +18,85 @@ import org.reactivestreams.*; -import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.*; -import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.Function; -import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.internal.functions.ObjectHelper; import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; -import io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; -import io.reactivex.rxjava3.internal.util.NotificationLite; +import io.reactivex.rxjava3.internal.util.*; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.processors.UnicastProcessor; -import io.reactivex.rxjava3.subscribers.*; public final class FlowableWindowBoundarySelector extends AbstractFlowableWithUpstream> { final Publisher open; - final Function> close; + final Function> closingIndicator; final int bufferSize; public FlowableWindowBoundarySelector( Flowable source, - Publisher open, Function> close, + Publisher open, Function> closingIndicator, int bufferSize) { super(source); this.open = open; - this.close = close; + this.closingIndicator = closingIndicator; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Subscriber> s) { source.subscribe(new WindowBoundaryMainSubscriber( - new SerializedSubscriber>(s), - open, close, bufferSize)); + s, open, closingIndicator, bufferSize)); } static final class WindowBoundaryMainSubscriber - extends QueueDrainSubscriber> - implements Subscription { + extends AtomicInteger + implements FlowableSubscriber, Subscription, Runnable { + private static final long serialVersionUID = 8646217640096099753L; + + final Subscriber> downstream; final Publisher open; - final Function> close; + final Function> closingIndicator; final int bufferSize; final CompositeDisposable resources; - Subscription upstream; + final WindowStartSubscriber startSubscriber; + + final List> windows; + + final SimplePlainQueue queue; + + final AtomicLong windowCount; - final AtomicReference boundary = new AtomicReference(); + final AtomicBoolean downstreamCancelled; - final List> ws; + final AtomicLong requested; + long emitted; - final AtomicLong windows = new AtomicLong(); + volatile boolean upstreamCanceled; - final AtomicBoolean stopWindows = new AtomicBoolean(); + volatile boolean upstreamDone; + volatile boolean openDone; + final AtomicThrowable error; + + Subscription upstream; WindowBoundaryMainSubscriber(Subscriber> actual, - Publisher open, Function> close, int bufferSize) { - super(actual, new MpscLinkedQueue()); + Publisher open, Function> closingIndicator, int bufferSize) { + this.downstream = actual; + this.queue = new MpscLinkedQueue(); this.open = open; - this.close = close; + this.closingIndicator = closingIndicator; this.bufferSize = bufferSize; this.resources = new CompositeDisposable(); - this.ws = new ArrayList>(); - windows.lazySet(1); + this.windows = new ArrayList>(); + this.windowCount = new AtomicLong(1L); + this.downstreamCancelled = new AtomicBoolean(); + this.error = new AtomicThrowable(); + this.startSubscriber = new WindowStartSubscriber(this); + this.requested = new AtomicLong(); } @Override @@ -91,297 +106,340 @@ public void onSubscribe(Subscription s) { downstream.onSubscribe(this); - if (stopWindows.get()) { - return; - } - - OperatorWindowBoundaryOpenSubscriber os = new OperatorWindowBoundaryOpenSubscriber(this); + open.subscribe(startSubscriber); - if (boundary.compareAndSet(null, os)) { - s.request(Long.MAX_VALUE); - open.subscribe(os); - } + s.request(Long.MAX_VALUE); } } @Override public void onNext(T t) { - if (done) { - return; - } - if (fastEnter()) { - for (UnicastProcessor w : ws) { - w.onNext(t); - } - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } - } - drainLoop(); + queue.offer(t); + drain(); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; - done = true; - - if (enter()) { - drainLoop(); - } - - if (windows.decrementAndGet() == 0) { - resources.dispose(); + startSubscriber.cancel(); + resources.dispose(); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); } - - downstream.onError(t); } @Override public void onComplete() { - if (done) { - return; + startSubscriber.cancel(); + resources.dispose(); + upstreamDone = true; + drain(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); } - done = true; + } - if (enter()) { - drainLoop(); + @Override + public void cancel() { + if (downstreamCancelled.compareAndSet(false, true)) { + if (windowCount.decrementAndGet() == 0) { + upstream.cancel(); + startSubscriber.cancel(); + resources.dispose(); + error.tryTerminateAndReport(); + upstreamCanceled = true; + drain(); + } else { + startSubscriber.cancel(); + } } + } - if (windows.decrementAndGet() == 0) { + @Override + public void run() { + if (windowCount.decrementAndGet() == 0) { + upstream.cancel(); + startSubscriber.cancel(); resources.dispose(); + error.tryTerminateAndReport(); + upstreamCanceled = true; + drain(); } + } - downstream.onComplete(); + void open(B startValue) { + queue.offer(new WindowStartItem(startValue)); + drain(); } - void error(Throwable t) { + void openError(Throwable t) { upstream.cancel(); resources.dispose(); - DisposableHelper.dispose(boundary); - - downstream.onError(t); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); + } } - @Override - public void request(long n) { - requested(n); + void openComplete() { + openDone = true; + drain(); } - @Override - public void cancel() { - if (stopWindows.compareAndSet(false, true)) { - DisposableHelper.dispose(boundary); - if (windows.decrementAndGet() == 0) { - upstream.cancel(); - } - } + void close(WindowEndSubscriberIntercept what) { + queue.offer(what); + drain(); } - void dispose() { + void closeError(Throwable t) { + upstream.cancel(); + startSubscriber.cancel(); resources.dispose(); - DisposableHelper.dispose(boundary); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); + } } - void drainLoop() { - final SimplePlainQueue q = queue; - final Subscriber> a = downstream; - final List> ws = this.ws; + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; + final Subscriber> downstream = this.downstream; + final SimplePlainQueue queue = this.queue; + final List> windows = this.windows; for (;;) { - - for (;;) { - boolean d = done; - Object o = q.poll(); - - boolean empty = o == null; - - if (d && empty) { - dispose(); - Throwable e = error; - if (e != null) { - for (UnicastProcessor w : ws) { - w.onError(e); - } - } else { - for (UnicastProcessor w : ws) { - w.onComplete(); - } + if (upstreamCanceled) { + queue.clear(); + windows.clear(); + } else { + boolean isDone = upstreamDone; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone) { + if (isEmpty || error.get() != null) { + terminateDownstream(downstream); + upstreamCanceled = true; + continue; } - ws.clear(); - return; - } - - if (empty) { - break; } - if (o instanceof WindowOperation) { - @SuppressWarnings("unchecked") - WindowOperation wo = (WindowOperation) o; - - UnicastProcessor w = wo.w; - if (w != null) { - if (ws.remove(wo.w)) { - wo.w.onComplete(); - - if (windows.decrementAndGet() == 0) { - dispose(); - return; + if (!isEmpty) { + if (o instanceof WindowStartItem) { + if (!downstreamCancelled.get()) { + long emitted = this.emitted; + if (requested.get() != emitted) { + this.emitted = ++emitted; + + @SuppressWarnings("unchecked") + B startItem = ((WindowStartItem)o).item; + + Publisher endSource; + try { + endSource = ObjectHelper.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null Publisher"); + } catch (Throwable ex) { + upstream.cancel(); + startSubscriber.cancel(); + resources.dispose(); + Exceptions.throwIfFatal(ex); + error.tryAddThrowableOrReport(ex); + upstreamDone = true; + continue; + } + + windowCount.getAndIncrement(); + UnicastProcessor newWindow = UnicastProcessor.create(bufferSize, this); + WindowEndSubscriberIntercept endSubscriber = new WindowEndSubscriberIntercept(this, newWindow); + + downstream.onNext(endSubscriber); + + if (endSubscriber.tryAbandon()) { + newWindow.onComplete(); + } else { + windows.add(newWindow); + resources.add(endSubscriber); + endSource.subscribe(endSubscriber); + } + } else { + upstream.cancel(); + startSubscriber.cancel(); + resources.dispose(); + error.tryAddThrowableOrReport(new MissingBackpressureException(FlowableWindowTimed.missingBackpressureMessage(emitted))); + upstreamDone = true; } } - continue; - } - - if (stopWindows.get()) { - continue; } + else if (o instanceof WindowEndSubscriberIntercept) { + @SuppressWarnings("unchecked") + UnicastProcessor w = ((WindowEndSubscriberIntercept)o).window; - w = UnicastProcessor.create(bufferSize); - - long r = requested(); - if (r != 0L) { - ws.add(w); - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } + windows.remove(w); + resources.delete((Disposable)o); + w.onComplete(); } else { - cancel(); - a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests")); - continue; - } - - Publisher p; + @SuppressWarnings("unchecked") + T item = (T)o; - try { - p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null"); - } catch (Throwable e) { - cancel(); - a.onError(e); - continue; - } - - OperatorWindowBoundaryCloseSubscriber cl = new OperatorWindowBoundaryCloseSubscriber(this, w); - - if (resources.add(cl)) { - windows.getAndIncrement(); - - p.subscribe(cl); + for (UnicastProcessor w : windows) { + w.onNext(item); + } } continue; } - - for (UnicastProcessor w : ws) { - w.onNext(NotificationLite.getValue(o)); + else if (openDone && windows.size() == 0) { + upstream.cancel(); + startSubscriber.cancel(); + resources.dispose(); + terminateDownstream(downstream); + upstreamCanceled = true; + continue; } } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } } } - @Override - public boolean accept(Subscriber> a, Object v) { - // not used by this operator - return false; + void terminateDownstream(Subscriber downstream) { + Throwable ex = error.terminate(); + if (ex == null) { + for (UnicastProcessor w : windows) { + w.onComplete(); + } + downstream.onComplete(); + } else if (ex != ExceptionHelper.TERMINATED) { + for (UnicastProcessor w : windows) { + w.onError(ex); + } + downstream.onError(ex); + } } - void open(B b) { - queue.offer(new WindowOperation(null, b)); - if (enter()) { - drainLoop(); + static final class WindowStartItem { + + final B item; + + WindowStartItem(B item) { + this.item = item; } } - void close(OperatorWindowBoundaryCloseSubscriber w) { - resources.delete(w); - queue.offer(new WindowOperation(w.w, null)); - if (enter()) { - drainLoop(); + static final class WindowStartSubscriber extends AtomicReference + implements FlowableSubscriber { + + private static final long serialVersionUID = -3326496781427702834L; + + final WindowBoundaryMainSubscriber parent; + + WindowStartSubscriber(WindowBoundaryMainSubscriber parent) { + this.parent = parent; } - } - } - static final class WindowOperation { - final UnicastProcessor w; - final B open; - WindowOperation(UnicastProcessor w, B open) { - this.w = w; - this.open = open; - } - } + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } - static final class OperatorWindowBoundaryOpenSubscriber extends DisposableSubscriber { - final WindowBoundaryMainSubscriber parent; + @Override + public void onNext(B t) { + parent.open(t); + } - OperatorWindowBoundaryOpenSubscriber(WindowBoundaryMainSubscriber parent) { - this.parent = parent; - } + @Override + public void onError(Throwable t) { + parent.openError(t); + } - @Override - public void onNext(B t) { - parent.open(t); - } + @Override + public void onComplete() { + parent.openComplete(); + } - @Override - public void onError(Throwable t) { - parent.error(t); + void cancel() { + SubscriptionHelper.cancel(this); + } } - @Override - public void onComplete() { - parent.onComplete(); - } - } + static final class WindowEndSubscriberIntercept extends Flowable + implements FlowableSubscriber, Disposable { - static final class OperatorWindowBoundaryCloseSubscriber extends DisposableSubscriber { - final WindowBoundaryMainSubscriber parent; - final UnicastProcessor w; + final WindowBoundaryMainSubscriber parent; - boolean done; + final UnicastProcessor window; - OperatorWindowBoundaryCloseSubscriber(WindowBoundaryMainSubscriber parent, UnicastProcessor w) { - this.parent = parent; - this.w = w; - } + final AtomicReference upstream; - @Override - public void onNext(V t) { - cancel(); - onComplete(); - } + final AtomicBoolean once; - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; + WindowEndSubscriberIntercept(WindowBoundaryMainSubscriber parent, UnicastProcessor window) { + this.parent = parent; + this.window = window; + this.upstream = new AtomicReference(); + this.once = new AtomicBoolean(); } - done = true; - parent.error(t); - } - @Override - public void onComplete() { - if (done) { - return; + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(upstream, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(V t) { + if (SubscriptionHelper.cancel(upstream)) { + parent.close(this); + } + } + + @Override + public void onError(Throwable t) { + if (isDisposed()) { + RxJavaPlugins.onError(t); + } else { + parent.closeError(t); + } + } + + @Override + public void onComplete() { + parent.close(this); + } + + @Override + public void dispose() { + SubscriptionHelper.cancel(upstream); + } + + @Override + public boolean isDisposed() { + return upstream.get() == SubscriptionHelper.CANCELLED; + } + + @Override + protected void subscribeActual(Subscriber s) { + window.subscribe(s); + once.set(true); + } + + boolean tryAbandon() { + return !once.get() && once.compareAndSet(false, true); } - done = true; - parent.close(this); } } + } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundary.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundary.java index cbcdfc3c91..48de711a04 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundary.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundary.java @@ -230,7 +230,11 @@ void drain() { window = w; windows.getAndIncrement(); - downstream.onNext(w); + ObservableWindowSubscribeIntercept intercept = new ObservableWindowSubscribeIntercept(w); + downstream.onNext(intercept); + if (intercept.tryAbandon()) { + w.onComplete(); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java index 35e92c3fec..037c7d0773 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -24,62 +24,79 @@ import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.internal.functions.ObjectHelper; -import io.reactivex.rxjava3.internal.observers.QueueDrainObserver; +import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; -import io.reactivex.rxjava3.internal.util.NotificationLite; -import io.reactivex.rxjava3.observers.*; +import io.reactivex.rxjava3.internal.util.*; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.subjects.UnicastSubject; public final class ObservableWindowBoundarySelector extends AbstractObservableWithUpstream> { final ObservableSource open; - final Function> close; + final Function> closingIndicator; final int bufferSize; public ObservableWindowBoundarySelector( ObservableSource source, - ObservableSource open, Function> close, + ObservableSource open, Function> closingIndicator, int bufferSize) { super(source); this.open = open; - this.close = close; + this.closingIndicator = closingIndicator; this.bufferSize = bufferSize; } @Override public void subscribeActual(Observer> t) { source.subscribe(new WindowBoundaryMainObserver( - new SerializedObserver>(t), - open, close, bufferSize)); + t, open, closingIndicator, bufferSize)); } static final class WindowBoundaryMainObserver - extends QueueDrainObserver> - implements Disposable { + extends AtomicInteger + implements Observer, Disposable, Runnable { + private static final long serialVersionUID = 8646217640096099753L; + + final Observer> downstream; final ObservableSource open; - final Function> close; + final Function> closingIndicator; final int bufferSize; final CompositeDisposable resources; - Disposable upstream; + final WindowStartObserver startObserver; + + final List> windows; + + final SimplePlainQueue queue; + + final AtomicLong windowCount; - final AtomicReference boundary = new AtomicReference(); + final AtomicBoolean downstreamDisposed; - final List> ws; + final AtomicLong requested; + long emitted; - final AtomicLong windows = new AtomicLong(); + volatile boolean upstreamCanceled; - final AtomicBoolean stopWindows = new AtomicBoolean(); + volatile boolean upstreamDone; + volatile boolean openDone; + final AtomicThrowable error; - WindowBoundaryMainObserver(Observer> actual, - ObservableSource open, Function> close, int bufferSize) { - super(actual, new MpscLinkedQueue()); + Disposable upstream; + + WindowBoundaryMainObserver(Observer> downstream, + ObservableSource open, Function> closingIndicator, int bufferSize) { + this.downstream = downstream; + this.queue = new MpscLinkedQueue(); this.open = open; - this.close = close; + this.closingIndicator = closingIndicator; this.bufferSize = bufferSize; this.resources = new CompositeDisposable(); - this.ws = new ArrayList>(); - windows.lazySet(1); + this.windows = new ArrayList>(); + this.windowCount = new AtomicLong(1L); + this.downstreamDisposed = new AtomicBoolean(); + this.error = new AtomicThrowable(); + this.startObserver = new WindowStartObserver(this); + this.requested = new AtomicLong(); } @Override @@ -89,281 +106,320 @@ public void onSubscribe(Disposable d) { downstream.onSubscribe(this); - if (stopWindows.get()) { - return; - } - - OperatorWindowBoundaryOpenObserver os = new OperatorWindowBoundaryOpenObserver(this); - - if (boundary.compareAndSet(null, os)) { - open.subscribe(os); - } + open.subscribe(startObserver); } } @Override public void onNext(T t) { - if (fastEnter()) { - for (UnicastSubject w : ws) { - w.onNext(t); - } - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } - } - drainLoop(); + queue.offer(t); + drain(); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; - done = true; - - if (enter()) { - drainLoop(); - } - - if (windows.decrementAndGet() == 0) { - resources.dispose(); + startObserver.dispose(); + resources.dispose(); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); } - - downstream.onError(t); } @Override public void onComplete() { - if (done) { - return; - } - done = true; - - if (enter()) { - drainLoop(); - } - - if (windows.decrementAndGet() == 0) { - resources.dispose(); - } - - downstream.onComplete(); - } - - void error(Throwable t) { - upstream.dispose(); + startObserver.dispose(); resources.dispose(); - onError(t); + upstreamDone = true; + drain(); } @Override public void dispose() { - if (stopWindows.compareAndSet(false, true)) { - DisposableHelper.dispose(boundary); - if (windows.decrementAndGet() == 0) { + if (downstreamDisposed.compareAndSet(false, true)) { + if (windowCount.decrementAndGet() == 0) { upstream.dispose(); + startObserver.dispose(); + resources.dispose(); + error.tryTerminateAndReport(); + upstreamCanceled = true; + drain(); + } else { + startObserver.dispose(); } } } @Override public boolean isDisposed() { - return stopWindows.get(); + return downstreamDisposed.get(); + } + + @Override + public void run() { + if (windowCount.decrementAndGet() == 0) { + upstream.dispose(); + startObserver.dispose(); + resources.dispose(); + error.tryTerminateAndReport(); + upstreamCanceled = true; + drain(); + } + } + + void open(B startValue) { + queue.offer(new WindowStartItem(startValue)); + drain(); } - void disposeBoundary() { + void openError(Throwable t) { + upstream.dispose(); resources.dispose(); - DisposableHelper.dispose(boundary); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); + } } - void drainLoop() { - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = downstream; - final List> ws = this.ws; - int missed = 1; + void openComplete() { + openDone = true; + drain(); + } - for (;;) { + void close(WindowEndObserverIntercept what) { + queue.offer(what); + drain(); + } - for (;;) { - boolean d = done; + void closeError(Throwable t) { + upstream.dispose(); + startObserver.dispose(); + resources.dispose(); + if (error.tryAddThrowableOrReport(t)) { + upstreamDone = true; + drain(); + } + } - Object o = q.poll(); + void drain() { + if (getAndIncrement() != 0) { + return; + } - boolean empty = o == null; + int missed = 1; + final Observer> downstream = this.downstream; + final SimplePlainQueue queue = this.queue; + final List> windows = this.windows; - if (d && empty) { - disposeBoundary(); - Throwable e = error; - if (e != null) { - for (UnicastSubject w : ws) { - w.onError(e); - } - } else { - for (UnicastSubject w : ws) { - w.onComplete(); - } + for (;;) { + if (upstreamCanceled) { + queue.clear(); + windows.clear(); + } else { + boolean isDone = upstreamDone; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone) { + if (isEmpty || error.get() != null) { + terminateDownstream(downstream); + upstreamCanceled = true; + continue; } - ws.clear(); - return; } - if (empty) { - break; - } + if (!isEmpty) { + if (o instanceof WindowStartItem) { + if (!downstreamDisposed.get()) { + @SuppressWarnings("unchecked") + B startItem = ((WindowStartItem)o).item; + + ObservableSource endSource; + try { + endSource = ObjectHelper.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null ObservableSource"); + } catch (Throwable ex) { + upstream.dispose(); + startObserver.dispose(); + resources.dispose(); + Exceptions.throwIfFatal(ex); + error.tryAddThrowableOrReport(ex); + upstreamDone = true; + continue; + } - if (o instanceof WindowOperation) { - @SuppressWarnings("unchecked") - WindowOperation wo = (WindowOperation) o; + windowCount.getAndIncrement(); + UnicastSubject newWindow = UnicastSubject.create(bufferSize, this); + WindowEndObserverIntercept endObserver = new WindowEndObserverIntercept(this, newWindow); - UnicastSubject w = wo.w; - if (w != null) { - if (ws.remove(wo.w)) { - wo.w.onComplete(); + downstream.onNext(endObserver); - if (windows.decrementAndGet() == 0) { - disposeBoundary(); - return; + if (endObserver.tryAbandon()) { + newWindow.onComplete(); + } else { + windows.add(newWindow); + resources.add(endObserver); + endSource.subscribe(endObserver); } } - continue; - } - - if (stopWindows.get()) { - continue; } + else if (o instanceof WindowEndObserverIntercept) { + @SuppressWarnings("unchecked") + UnicastSubject w = ((WindowEndObserverIntercept)o).window; - w = UnicastSubject.create(bufferSize); - - ws.add(w); - a.onNext(w); - - ObservableSource p; - - try { - p = ObjectHelper.requireNonNull(close.apply(wo.open), "The ObservableSource supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - stopWindows.set(true); - a.onError(e); - continue; - } - - OperatorWindowBoundaryCloseObserver cl = new OperatorWindowBoundaryCloseObserver(this, w); - - if (resources.add(cl)) { - windows.getAndIncrement(); + windows.remove(w); + resources.delete((Disposable)o); + w.onComplete(); + } else { + @SuppressWarnings("unchecked") + T item = (T)o; - p.subscribe(cl); + for (UnicastSubject w : windows) { + w.onNext(item); + } } continue; } - - for (UnicastSubject w : ws) { - w.onNext(NotificationLite.getValue(o)); + else if (openDone && windows.size() == 0) { + upstream.dispose(); + startObserver.dispose(); + resources.dispose(); + terminateDownstream(downstream); + upstreamCanceled = true; + continue; } } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } } } - @Override - public void accept(Observer> a, Object v) { + void terminateDownstream(Observer downstream) { + Throwable ex = error.terminate(); + if (ex == null) { + for (UnicastSubject w : windows) { + w.onComplete(); + } + downstream.onComplete(); + } else if (ex != ExceptionHelper.TERMINATED) { + for (UnicastSubject w : windows) { + w.onError(ex); + } + downstream.onError(ex); + } } - void open(B b) { - queue.offer(new WindowOperation(null, b)); - if (enter()) { - drainLoop(); + static final class WindowStartItem { + + final B item; + + WindowStartItem(B item) { + this.item = item; } } - void close(OperatorWindowBoundaryCloseObserver w) { - resources.delete(w); - queue.offer(new WindowOperation(w.w, null)); - if (enter()) { - drainLoop(); + static final class WindowStartObserver extends AtomicReference + implements Observer { + + private static final long serialVersionUID = -3326496781427702834L; + + final WindowBoundaryMainObserver parent; + + WindowStartObserver(WindowBoundaryMainObserver parent) { + this.parent = parent; } - } - } - static final class WindowOperation { - final UnicastSubject w; - final B open; - WindowOperation(UnicastSubject w, B open) { - this.w = w; - this.open = open; - } - } + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } - static final class OperatorWindowBoundaryOpenObserver extends DisposableObserver { - final WindowBoundaryMainObserver parent; + @Override + public void onNext(B t) { + parent.open(t); + } - OperatorWindowBoundaryOpenObserver(WindowBoundaryMainObserver parent) { - this.parent = parent; - } + @Override + public void onError(Throwable t) { + parent.openError(t); + } - @Override - public void onNext(B t) { - parent.open(t); - } + @Override + public void onComplete() { + parent.openComplete(); + } - @Override - public void onError(Throwable t) { - parent.error(t); + void dispose() { + DisposableHelper.dispose(this); + } } - @Override - public void onComplete() { - parent.onComplete(); - } - } + static final class WindowEndObserverIntercept extends Observable + implements Observer, Disposable { - static final class OperatorWindowBoundaryCloseObserver extends DisposableObserver { - final WindowBoundaryMainObserver parent; - final UnicastSubject w; + final WindowBoundaryMainObserver parent; - boolean done; + final UnicastSubject window; - OperatorWindowBoundaryCloseObserver(WindowBoundaryMainObserver parent, UnicastSubject w) { - this.parent = parent; - this.w = w; - } + final AtomicReference upstream; - @Override - public void onNext(V t) { - dispose(); - onComplete(); - } + final AtomicBoolean once; - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; + WindowEndObserverIntercept(WindowBoundaryMainObserver parent, UnicastSubject window) { + this.parent = parent; + this.window = window; + this.upstream = new AtomicReference(); + this.once = new AtomicBoolean(); } - done = true; - parent.error(t); - } - @Override - public void onComplete() { - if (done) { - return; + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(upstream, d); + } + + @Override + public void onNext(V t) { + if (DisposableHelper.dispose(upstream)) { + parent.close(this); + } + } + + @Override + public void onError(Throwable t) { + if (isDisposed()) { + RxJavaPlugins.onError(t); + } else { + parent.closeError(t); + } + } + + @Override + public void onComplete() { + parent.close(this); + } + + @Override + public void dispose() { + DisposableHelper.dispose(upstream); + } + + @Override + public boolean isDisposed() { + return upstream.get() == DisposableHelper.DISPOSED; + } + + @Override + protected void subscribeActual(Observer o) { + window.subscribe(o); + once.set(true); + } + + boolean tryAbandon() { + return !once.get() && once.compareAndSet(false, true); } - done = true; - parent.close(this); } } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithFlowableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithFlowableTest.java index fcd2390f7e..8b001803c0 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithFlowableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -26,7 +26,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.*; -import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -418,6 +418,12 @@ protected void subscribeActual(Subscriber subscriber) { ref.set(subscriber); } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .to(TestHelper.>testConsumer()); ts @@ -674,4 +680,65 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void cancellingWindowCancelsUpstream() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(Flowable.just(1).concatWith(Flowable.never())) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstream() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(Flowable.never()) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + ts + .assertValueCount(1) + ; + + pp.onNext(1); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + inner.get().test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index 3a76a4c7a8..9420794dc5 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -17,7 +17,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.junit.*; import org.reactivestreams.*; @@ -160,7 +160,14 @@ public void noUnsubscribeAndNoLeak() { public Flowable apply(Integer t) { return close; } - }).subscribe(ts); + }) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) + .subscribe(ts); open.onNext(1); source.onNext(1); @@ -197,7 +204,14 @@ public void unsubscribeAll() { public Flowable apply(Integer t) { return close; } - }).subscribe(ts); + }) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) + .subscribe(ts); open.onNext(1); @@ -248,16 +262,6 @@ public Flowable apply(Flowable v) throws Exception { .assertResult(1, 2); } - @Test - public void badSourceCallable() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return f.window(Flowable.just(1), Functions.justFunction(Flowable.never())); - } - }, false, 1, 1, (Object[])null); - } - @Test public void boundarySelectorNormal() { PublishProcessor source = PublishProcessor.create(); @@ -372,6 +376,12 @@ protected void subscribeActual( }; } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .test() .assertValueCount(1) .assertNoErrors() @@ -406,6 +416,12 @@ public Flowable apply(Integer v) throws Exception { return flowableDisposed(closeDisposed); } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .to(TestHelper.>testConsumer()) .assertSubscribed() .assertNoErrors() @@ -436,4 +452,93 @@ public void mainWindowMissingBackpressure() { assertFalse(source.hasSubscribers()); assertFalse(boundary.hasSubscribers()); } + + @Test + public void cancellingWindowCancelsUpstream() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(Flowable.just(1).concatWith(Flowable.never()), Functions.justFunction(Flowable.never())) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstream() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(Flowable.just(1).concatWith(Flowable.never()), + Functions.justFunction(Flowable.never())) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + ts + .assertValueCount(1) + ; + + pp.onNext(1); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + inner.get().test().assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void closingIndicatorFunctionCrash() { + + PublishProcessor source = PublishProcessor.create(); + PublishProcessor boundary = PublishProcessor.create(); + + TestSubscriber> ts = source.window(boundary, new Function>() { + @Override + public Publisher apply(Integer end) throws Throwable { + throw new TestException(); + } + }) + .test() + ; + + ts.assertEmpty(); + + boundary.onNext(1); + + ts.assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + assertFalse(boundary.hasSubscribers()); + + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithObservableTest.java index 1ac8c7209b..e6804ddcab 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -28,7 +28,7 @@ import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.exceptions.*; -import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.observers.*; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -378,6 +378,12 @@ protected void subscribeActual(Observer observer) { ref.set(observer); } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .to(TestHelper.>testConsumer()); to @@ -636,4 +642,65 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void cancellingWindowCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(Observable.never()) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has observers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(Observable.never()) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + to + .assertValueCount(1) + ; + + ps.onNext(1); + + assertTrue(ps.hasObservers()); + + to.dispose(); + + to + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + assertFalse("Subject still has observers!", ps.hasObservers()); + + inner.get().test().assertResult(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java index 8053b0f532..b7ef436399 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java @@ -398,7 +398,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -426,7 +426,7 @@ public void accept(Observable v) throws Throwable { .assertNoErrors() .assertComplete(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); inner.get().test().assertResult(1); } @@ -452,7 +452,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -480,7 +480,7 @@ public void accept(Observable v) throws Throwable { .assertNoErrors() .assertComplete(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); inner.get().test().assertResult(1); } @@ -506,7 +506,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -534,7 +534,7 @@ public void accept(Observable v) throws Throwable { .assertNoErrors() .assertComplete(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); inner.get().test().assertResult(1); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java index 90732ddf1e..da02116cd9 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java @@ -17,7 +17,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.junit.*; @@ -161,7 +161,14 @@ public void noUnsubscribeAndNoLeak() { public Observable apply(Integer t) { return close; } - }).subscribe(to); + }) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) + .subscribe(to); open.onNext(1); source.onNext(1); @@ -199,7 +206,14 @@ public void unsubscribeAll() { public Observable apply(Integer t) { return close; } - }).subscribe(to); + }) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) + .subscribe(to); open.onNext(1); @@ -365,6 +379,12 @@ protected void subscribeActual( }; } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .test() .assertValueCount(1) .assertNoErrors() @@ -399,6 +419,12 @@ public ObservableSource apply(Integer v) throws Exception { return observableDisposed(closeDisposed); } }) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable w) throws Throwable { + w.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); // avoid abandonment + } + }) .to(TestHelper.>testConsumer()) .assertSubscribed() .assertNoErrors() @@ -409,4 +435,101 @@ public ObservableSource apply(Integer v) throws Exception { assertTrue(openDisposed.get()); assertTrue(closeDisposed.get()); } + + @Test + public void cancellingWindowCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.window(Observable.just(1).concatWith(Observable.never()), Functions.justFunction(Observable.never())) + .take(1) + .flatMap(new Function, Observable>() { + @Override + public Observable apply(Observable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to + .assertResult(1); + + assertFalse("Subject still has observers!", ps.hasObservers()); + } + + @Test + public void windowAbandonmentCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestObserver> to = ps.window(Observable.just(1).concatWith(Observable.never()), + Functions.justFunction(Observable.never())) + .doOnNext(new Consumer>() { + @Override + public void accept(Observable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + to + .assertValueCount(1) + ; + + ps.onNext(1); + + assertTrue(ps.hasObservers()); + + to.dispose(); + + to + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + assertFalse("Subject still has observers!", ps.hasObservers()); + + inner.get().test().assertResult(); + } + + @SuppressWarnings("unchecked") + @Test + public void closingIndicatorFunctionCrash() { + + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + TestObserver> to = source.window(boundary, new Function>() { + @Override + public Observable apply(Integer end) throws Throwable { + throw new TestException(); + } + }) + .test() + ; + + to.assertEmpty(); + + boundary.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(source.hasObservers()); + assertFalse(boundary.hasObservers()); + } + + @SuppressWarnings("unchecked") + @Test + public void mainError() { + Observable.error(new TestException()) + .window(Observable.never(), Functions.justFunction(Observable.never())) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java index 271cde0208..8eaebf5991 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -971,7 +971,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -990,7 +990,7 @@ public void accept(Observable v) throws Throwable { }) .test(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); to .assertValueCount(1) @@ -1021,7 +1021,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -1040,7 +1040,7 @@ public void accept(Observable v) throws Throwable { }) .test(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); to .assertValueCount(1) @@ -1071,7 +1071,7 @@ public Observable apply(Observable w) throws Throwable { to .assertResult(1); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); } @Test @@ -1090,7 +1090,7 @@ public void accept(Observable v) throws Throwable { }) .test(); - assertFalse("Subject still has subscribers!", ps.hasObservers()); + assertFalse("Subject still has observers!", ps.hasObservers()); to .assertValueCount(1)