From b5df250f9cbd32921516807d29949576cd7d02c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 20 Jun 2019 20:22:24 +0200 Subject: [PATCH 1/2] 3.x: Add X.fromSupplier() --- src/main/java/io/reactivex/Completable.java | 32 ++ src/main/java/io/reactivex/Flowable.java | 42 +++ src/main/java/io/reactivex/Maybe.java | 47 +++ src/main/java/io/reactivex/Observable.java | 38 +++ src/main/java/io/reactivex/Single.java | 40 +++ .../completable/CompletableFromSupplier.java | 53 +++ .../flowable/FlowableFromCallable.java | 5 +- .../flowable/FlowableFromSupplier.java | 63 ++++ .../operators/maybe/MaybeFromSupplier.java | 71 ++++ .../observable/ObservableFromCallable.java | 7 +- .../observable/ObservableFromSupplier.java | 62 ++++ .../operators/single/SingleFromSupplier.java | 62 ++++ .../CompletableFromSupplierTest.java | 185 +++++++++++ .../flowable/FlowableFromSupplierTest.java | 269 +++++++++++++++ .../maybe/MaybeFromSupplierTest.java | 222 +++++++++++++ .../ObservableFromSupplierTest.java | 314 ++++++++++++++++++ .../single/SingleFromCallableTest.java | 4 +- .../single/SingleFromSupplierTest.java | 275 +++++++++++++++ .../io/reactivex/tck/FromCallableTckTest.java | 14 + .../io/reactivex/tck/FromSupplierTckTest.java | 56 ++++ 20 files changed, 1855 insertions(+), 6 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java create mode 100644 src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java create mode 100644 src/test/java/io/reactivex/tck/FromSupplierTckTest.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 92de75d97b..b618650d5d 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -444,6 +444,8 @@ public static Completable fromAction(final Action run) { * * @param callable the callable instance to execute for each subscriber * @return the new Completable instance + * @see #defer(Supplier) + * @see #fromSupplier(Supplier) */ @CheckReturnValue @NonNull @@ -609,6 +611,36 @@ public static Completable fromSingle(final SingleSource single) { return RxJavaPlugins.onAssembly(new CompletableFromSingle(single)); } + /** + * Returns a Completable which when subscribed, executes the supplier function, ignores its + * normal result and emits onError or onComplete only. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@link Supplier} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link CompletableObserver#onError(Throwable)}, + * except when the downstream has disposed this {@code Completable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param supplier the Supplier instance to execute for each subscriber + * @return the new Completable instance + * @see #defer(Supplier) + * @see #fromCallable(Callable) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Completable fromSupplier(final Supplier supplier) { + ObjectHelper.requireNonNull(supplier, "callable is null"); + return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier)); + } + /** * Returns a Completable instance that subscribes to all sources at once and * completes only when all source Completables complete or one of them emits an error. diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 07b76aa101..50c1fe436a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -2087,6 +2087,7 @@ public static Flowable fromArray(T... items) { * the type of the item emitted by the Publisher * @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function * @see #defer(Supplier) + * @see #fromSupplier(Supplier) * @since 2.0 */ @CheckReturnValue @@ -2331,6 +2332,47 @@ public static Flowable fromPublisher(final Publisher source) return RxJavaPlugins.onAssembly(new FlowableFromPublisher(source)); } + /** + * Returns a Flowable that, when a Subscriber subscribes to it, invokes a supplier function you specify and then + * emits the value returned from that function. + *

+ * + *

+ * This allows you to defer the execution of the function you specify until a Subscriber subscribes to the + * Publisher. That is to say, it makes the function "lazy." + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@link Supplier} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Subscriber#onError(Throwable)}, + * except when the downstream has canceled this {@code Flowable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * + * @param supplier + * a function, the execution of which should be deferred; {@code fromSupplier} will invoke this + * function only when a Subscriber subscribes to the Publisher that {@code fromSupplier} returns + * @param + * the type of the item emitted by the Publisher + * @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function + * @see #defer(Supplier) + * @see #fromCallable(Callable) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable fromSupplier(Supplier supplier) { + ObjectHelper.requireNonNull(supplier, "supplier is null"); + return RxJavaPlugins.onAssembly(new FlowableFromSupplier(supplier)); + } + /** * Returns a cold, synchronous, stateless and backpressure-aware generator of values. *

diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 5b03dd8184..72954d89d0 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -763,6 +763,8 @@ public static Maybe fromSingle(SingleSource singleSource) { * @param * the type of the item emitted by the {@link Maybe}. * @return a new Maybe instance + * @see #defer(Supplier) + * @see #fromSupplier(Supplier) */ @CheckReturnValue @NonNull @@ -865,6 +867,51 @@ public static Maybe fromRunnable(final Runnable run) { return RxJavaPlugins.onAssembly(new MaybeFromRunnable(run)); } + /** + * Returns a {@link Maybe} that invokes the given {@link Supplier} for each individual {@link MaybeObserver} that + * subscribes and emits the resulting non-null item via {@code onSuccess} while + * considering a {@code null} result from the {@code Supplier} as indication for valueless completion + * via {@code onComplete}. + *

+ * This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver} + * subscribes to the returned {@link Maybe}. In other terms, this source operator evaluates the given + * {@code Callable} "lazily". + *

+ * Note that the {@code null} handling of this operator differs from the similar source operators in the other + * {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their + * {@code Supplier} is {@code null} while this {@code fromSupplier} considers it to indicate the + * returned {@code Maybe} is empty. + *

+ *
Scheduler:
+ *
{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
Any non-fatal exception thrown by {@link Supplier#get()} will be forwarded to {@code onError}, + * except if the {@code MaybeObserver} disposed the subscription in the meantime. In this latter case, + * the exception is forwarded to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} wrapped into a + * {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}. + * Fatal exceptions are rethrown and usually will end up in the executing thread's + * {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)} handler.
+ *
+ * + * @param supplier + * a {@link Supplier} instance whose execution should be deferred and performed for each individual + * {@code MaybeObserver} that subscribes to the returned {@link Maybe}. + * @param + * the type of the item emitted by the {@link Maybe}. + * @return a new Maybe instance + * @see #defer(Supplier) + * @see #fromCallable(Callable) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Maybe fromSupplier(@NonNull final Supplier supplier) { + ObjectHelper.requireNonNull(supplier, "supplier is null"); + return RxJavaPlugins.onAssembly(new MaybeFromSupplier(supplier)); + } + /** * Returns a {@code Maybe} that emits a specified item. *

diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 6123ce455d..a258106c64 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1798,6 +1798,7 @@ public static Observable fromArray(T... items) { * the type of the item emitted by the ObservableSource * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function * @see #defer(Supplier) + * @see #fromSupplier(Supplier) * @since 2.0 */ @CheckReturnValue @@ -2021,6 +2022,43 @@ public static Observable fromPublisher(Publisher publisher) return RxJavaPlugins.onAssembly(new ObservableFromPublisher(publisher)); } + /** + * Returns an Observable that, when an observer subscribes to it, invokes a supplier function you specify and then + * emits the value returned from that function. + *

+ * + *

+ * This allows you to defer the execution of the function you specify until an observer subscribes to the + * ObservableSource. That is to say, it makes the function "lazy." + *

+ *
Scheduler:
+ *
{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@link Supplier} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link Observer#onError(Throwable)}, + * except when the downstream has disposed this {@code Observable} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * @param supplier + * a function, the execution of which should be deferred; {@code fromSupplier} will invoke this + * function only when an observer subscribes to the ObservableSource that {@code fromSupplier} returns + * @param + * the type of the item emitted by the ObservableSource + * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function + * @see #defer(Supplier) + * @see #fromCallable(Callable) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable fromSupplier(Supplier supplier) { + ObjectHelper.requireNonNull(supplier, "supplier is null"); + return RxJavaPlugins.onAssembly(new ObservableFromSupplier(supplier)); + } + /** * Returns a cold, synchronous and stateless generator of values. *

diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 7fa1f5b905..da446cfafd 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -613,6 +613,8 @@ public static Single error(final Throwable exception) { * @param * the type of the item emitted by the {@link Single}. * @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function. + * @see #defer(Supplier) + * @see #fromSupplier(Supplier) */ @CheckReturnValue @NonNull @@ -811,6 +813,44 @@ public static Single fromObservable(ObservableSource observa return RxJavaPlugins.onAssembly(new ObservableSingleSingle(observableSource, null)); } + /** + * Returns a {@link Single} that invokes passed supplierfunction and emits its result + * for each new SingleObserver that subscribes. + *

+ * Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}. + * It makes passed function "lazy". + * Result of the function invocation will be emitted by the {@link Single}. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the {@link Supplier} throws an exception, the respective {@link Throwable} is + * delivered to the downstream via {@link SingleObserver#onError(Throwable)}, + * except when the downstream has disposed this {@code Single} source. + * In this latter case, the {@code Throwable} is delivered to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}. + *
+ *
+ * + * @param supplier + * function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}. + * @param + * the type of the item emitted by the {@link Single}. + * @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function. + * @see #defer(Supplier) + * @see #fromCallable(Callable) + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Single fromSupplier(final Supplier supplier) { + ObjectHelper.requireNonNull(supplier, "supplier is null"); + return RxJavaPlugins.onAssembly(new SingleFromSupplier(supplier)); + } + /** * Returns a {@code Single} that emits a specified item. *

diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java new file mode 100644 index 0000000000..d676437b96 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromSupplier.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Call a Supplier for each incoming CompletableObserver and signal completion or the thrown exception. + * @since 3.0.0 + */ +public final class CompletableFromSupplier extends Completable { + + final Supplier supplier; + + public CompletableFromSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + Disposable d = Disposables.empty(); + observer.onSubscribe(d); + try { + supplier.get(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + observer.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } + if (!d.isDisposed()) { + observer.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java index 6dcb226daa..e54feffecf 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromCallable.java @@ -19,11 +19,12 @@ import io.reactivex.Flowable; import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.DeferredScalarSubscription; import io.reactivex.plugins.RxJavaPlugins; -public final class FlowableFromCallable extends Flowable implements Callable { +public final class FlowableFromCallable extends Flowable implements Supplier { final Callable callable; public FlowableFromCallable(Callable callable) { this.callable = callable; @@ -51,7 +52,7 @@ public void subscribeActual(Subscriber s) { } @Override - public T call() throws Exception { + public T get() throws Throwable { return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value"); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java new file mode 100644 index 0000000000..e9a5bf9d53 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromSupplier.java @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.reactivestreams.Subscriber; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.DeferredScalarSubscription; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Call a Supplier for each incoming Subscriber and signal the returned value or the thrown exception. + * @param the value type and element type returned by the supplier and the flow + * @since 3.0.0 + */ +public final class FlowableFromSupplier extends Flowable implements Supplier { + + final Supplier supplier; + + public FlowableFromSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public void subscribeActual(Subscriber s) { + DeferredScalarSubscription deferred = new DeferredScalarSubscription(s); + s.onSubscribe(deferred); + + T t; + try { + t = ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (deferred.isCancelled()) { + RxJavaPlugins.onError(ex); + } else { + s.onError(ex); + } + return; + } + + deferred.complete(t); + } + + @Override + public T get() throws Throwable { + return ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value"); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java new file mode 100644 index 0000000000..fa2e9c1cd4 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Executes a callable and signals its value as success or signals an exception. + * + * @param the value type + * @since 3.0.0 + */ +public final class MaybeFromSupplier extends Maybe implements Supplier { + + final Supplier supplier; + + public MaybeFromSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + Disposable d = Disposables.empty(); + observer.onSubscribe(d); + + if (!d.isDisposed()) { + + T v; + + try { + v = supplier.get(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!d.isDisposed()) { + observer.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!d.isDisposed()) { + if (v == null) { + observer.onComplete(); + } else { + observer.onSuccess(v); + } + } + } + } + + @Override + public T get() throws Throwable { + return supplier.get(); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java index fe3c364793..554d504675 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java @@ -17,6 +17,7 @@ import io.reactivex.*; import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.observers.DeferredScalarDisposable; import io.reactivex.plugins.RxJavaPlugins; @@ -25,8 +26,10 @@ * Calls a Callable and emits its resulting single value or signals its exception. * @param the value type */ -public final class ObservableFromCallable extends Observable implements Callable { +public final class ObservableFromCallable extends Observable implements Supplier { + final Callable callable; + public ObservableFromCallable(Callable callable) { this.callable = callable; } @@ -54,7 +57,7 @@ public void subscribeActual(Observer observer) { } @Override - public T call() throws Exception { + public T get() throws Throwable { return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value"); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java new file mode 100644 index 0000000000..2ba55b6f36 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.observers.DeferredScalarDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls a Callable and emits its resulting single value or signals its exception. + * @param the value type + * @since 3.0.0 + */ +public final class ObservableFromSupplier extends Observable implements Supplier { + + final Supplier supplier; + + public ObservableFromSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public void subscribeActual(Observer observer) { + DeferredScalarDisposable d = new DeferredScalarDisposable(observer); + observer.onSubscribe(d); + if (d.isDisposed()) { + return; + } + T value; + try { + value = ObjectHelper.requireNonNull(supplier.get(), "Supplier returned null"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + observer.onError(e); + } else { + RxJavaPlugins.onError(e); + } + return; + } + d.complete(value); + } + + @Override + public T get() throws Throwable { + return ObjectHelper.requireNonNull(supplier.get(), "The callable returned a null value"); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java b/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java new file mode 100644 index 0000000000..99766aed42 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFromSupplier.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Supplier; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls a supplier and emits its value or exception to the incoming SingleObserver. + * @param the value type returned + * @since 3.0.0 + */ +public final class SingleFromSupplier extends Single { + + final Supplier supplier; + + public SingleFromSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + Disposable d = Disposables.empty(); + observer.onSubscribe(d); + + if (d.isDisposed()) { + return; + } + T value; + + try { + value = ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (!d.isDisposed()) { + observer.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + return; + } + + if (!d.isDisposed()) { + observer.onSuccess(value); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java new file mode 100644 index 0000000000..5d13b36d82 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromSupplierTest.java @@ -0,0 +1,185 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Supplier; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.testsupport.TestHelper; + +public class CompletableFromSupplierTest { + + @Test(expected = NullPointerException.class) + public void fromSupplierNull() { + Completable.fromSupplier(null); + } + + @Test + public void fromSupplier() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Completable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromSupplierTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Supplier supplier = new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }; + + Completable.fromSupplier(supplier) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Completable.fromSupplier(supplier) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromSupplierInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Completable completable = Completable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }); + + assertEquals(0, atomicInteger.get()); + + completable + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromSupplierThrows() { + Completable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable { + Supplier func = mock(Supplier.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.get()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Completable fromSupplierObservable = Completable.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromSupplierObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.dispose(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).get(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } + + @Test + public void fromActionErrorsDisposed() { + final AtomicInteger calls = new AtomicInteger(); + Completable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + calls.incrementAndGet(); + throw new TestException(); + } + }) + .test(true) + .assertEmpty(); + + assertEquals(1, calls.get()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java new file mode 100644 index 0000000000..d909b30c7a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSupplierTest.java @@ -0,0 +1,269 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; +import io.reactivex.testsupport.TestHelper; + +public class FlowableFromSupplierTest { + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Throwable { + Supplier func = mock(Supplier.class); + + when(func.get()).thenReturn(new Object()); + + Flowable fromSupplierFlowable = Flowable.fromSupplier(func); + + verifyZeroInteractions(func); + + fromSupplierFlowable.subscribe(); + + verify(func).get(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnNextAndOnCompleted() throws Throwable { + Supplier func = mock(Supplier.class); + + when(func.get()).thenReturn("test_value"); + + Flowable fromSupplierFlowable = Flowable.fromSupplier(func); + + Subscriber subscriber = TestHelper.mockSubscriber(); + + fromSupplierFlowable.subscribe(subscriber); + + verify(subscriber).onNext("test_value"); + verify(subscriber).onComplete(); + verify(subscriber, never()).onError(any(Throwable.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnError() throws Throwable { + Supplier func = mock(Supplier.class); + + Throwable throwable = new IllegalStateException("Test exception"); + when(func.get()).thenThrow(throwable); + + Flowable fromSupplierFlowable = Flowable.fromSupplier(func); + + Subscriber subscriber = TestHelper.mockSubscriber(); + + fromSupplierFlowable.subscribe(subscriber); + + verify(subscriber, never()).onNext(any()); + verify(subscriber, never()).onComplete(); + verify(subscriber).onError(throwable); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable { + Supplier func = mock(Supplier.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.get()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Flowable fromSupplierFlowable = Flowable.fromSupplier(func); + + Subscriber subscriber = TestHelper.mockSubscriber(); + + TestSubscriber outer = new TestSubscriber(subscriber); + + fromSupplierFlowable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.cancel(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).get(); + + // Observer must not be notified at all + verify(subscriber).onSubscribe(any(Subscription.class)); + verifyNoMoreInteractions(subscriber); + } + + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Flowable fromSupplierFlowable = Flowable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + throw checkedException; + } + }); + + Subscriber subscriber = TestHelper.mockSubscriber(); + + fromSupplierFlowable.subscribe(subscriber); + + verify(subscriber).onSubscribe(any(Subscription.class)); + verify(subscriber).onError(checkedException); + verifyNoMoreInteractions(subscriber); + } + + @Test + public void fusedFlatMapExecution() { + final int[] calls = { 0 }; + + Flowable.just(1).flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapExecutionHidden() { + final int[] calls = { 0 }; + + Flowable.just(1).hide().flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapNull() { + Flowable.just(1).flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void fusedFlatMapNullHidden() { + Flowable.just(1).hide().flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test(timeout = 5000) + public void undeliverableUponCancellation() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + ts.cancel(); + throw new TestException(); + } + }) + .subscribe(ts); + + ts.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java new file mode 100644 index 0000000000..a281af9f4b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java @@ -0,0 +1,222 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Supplier; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.testsupport.TestHelper; + +public class MaybeFromSupplierTest { + + @Test(expected = NullPointerException.class) + public void fromSupplierNull() { + Maybe.fromSupplier(null); + } + + @Test + public void fromSupplier() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Maybe.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromSupplierTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Supplier callable = new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }; + + Maybe.fromSupplier(callable) + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + + Maybe.fromSupplier(callable) + .test() + .assertResult(); + + assertEquals(2, atomicInteger.get()); + } + + @Test + public void fromSupplierInvokesLazy() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Maybe completable = Maybe.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + atomicInteger.incrementAndGet(); + return null; + } + }); + + assertEquals(0, atomicInteger.get()); + + completable + .test() + .assertResult(); + + assertEquals(1, atomicInteger.get()); + } + + @Test + public void fromSupplierThrows() { + Maybe.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Throwable { + final int[] counter = { 0 }; + + Maybe m = Maybe.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + counter[0]++; + return 0; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Supplier); + + assertEquals(0, ((Supplier)m).get()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Maybe.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + return 1; + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.dispose(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable { + Supplier func = mock(Supplier.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.get()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Maybe fromSupplierObservable = Maybe.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromSupplierObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.dispose(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).get(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java new file mode 100644 index 0000000000..e3df29e1c5 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFromSupplierTest.java @@ -0,0 +1,314 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.testsupport.TestHelper; + +public class ObservableFromSupplierTest { + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Throwable { + Supplier func = mock(Supplier.class); + + when(func.get()).thenReturn(new Object()); + + Observable fromSupplierObservable = Observable.fromSupplier(func); + + verifyZeroInteractions(func); + + fromSupplierObservable.subscribe(); + + verify(func).get(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnNextAndOnCompleted() throws Throwable { + Supplier func = mock(Supplier.class); + + when(func.get()).thenReturn("test_value"); + + Observable fromSupplierObservable = Observable.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + fromSupplierObservable.subscribe(observer); + + verify(observer).onNext("test_value"); + verify(observer).onComplete(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCallOnError() throws Throwable { + Supplier func = mock(Supplier.class); + + Throwable throwable = new IllegalStateException("Test exception"); + when(func.get()).thenThrow(throwable); + + Observable fromSupplierObservable = Observable.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + fromSupplierObservable.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onComplete(); + verify(observer).onError(throwable); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable { + Supplier func = mock(Supplier.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.get()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Observable fromSupplierObservable = Observable.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromSupplierObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.dispose(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).get(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } + + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Observable fromSupplierObservable = Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + throw checkedException; + } + }); + + Observer observer = TestHelper.mockObserver(); + + fromSupplierObservable.subscribe(observer); + + verify(observer).onSubscribe(any(Disposable.class)); + verify(observer).onError(checkedException); + verifyNoMoreInteractions(observer); + } + + @Test + public void fusedFlatMapExecution() { + final int[] calls = { 0 }; + + Observable.just(1).flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapExecutionHidden() { + final int[] calls = { 0 }; + + Observable.just(1).hide().flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return ++calls[0]; + } + }); + } + }) + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @Test + public void fusedFlatMapNull() { + Observable.just(1).flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void fusedFlatMapNullHidden() { + Observable.just(1).hide().flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) + throws Exception { + return Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return null; + } + }); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void disposedOnArrival() { + final int[] count = { 0 }; + Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + count[0]++; + return 1; + } + }) + .test(true) + .assertEmpty(); + + assertEquals(0, count[0]); + } + + @Test + public void disposedOnCall() { + final TestObserver to = new TestObserver(); + + Observable.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + to.dispose(); + return 1; + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void disposedOnCallThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserver to = new TestObserver(); + + Observable.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + to.dispose(); + throw new TestException(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void take() { + Observable.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return 1; + } + }) + .take(1) + .test() + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java index 41b9ce523e..7f54a9e594 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromCallableTest.java @@ -44,8 +44,8 @@ public void fromCallableValue() { return 5; } }) - .test() - .assertResult(5); + .test() + .assertResult(5); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java new file mode 100644 index 0000000000..b0cda6668e --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java @@ -0,0 +1,275 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Supplier; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.testsupport.TestHelper; + +public class SingleFromSupplierTest { + + @Test + public void fromCallableValue() { + Single.fromSupplier(new Supplier() { + @Override public Integer get() throws Exception { + return 5; + } + }) + .test() + .assertResult(5); + } + + @Test + public void fromSupplierError() { + Single.fromSupplier(new Supplier() { + @Override public Integer get() throws Exception { + throw new UnsupportedOperationException(); + } + }) + .test() + .assertFailure(UnsupportedOperationException.class); + } + + @Test + public void fromSupplierNull() { + Single.fromSupplier(new Supplier() { + @Override public Integer get() throws Exception { + return null; + } + }) + .to(TestHelper.testConsumer()) + .assertFailureAndMessage(NullPointerException.class, "The supplier returned a null value"); + } + + @Test + public void fromSupplierTwice() { + final AtomicInteger atomicInteger = new AtomicInteger(); + + Supplier callable = new Supplier() { + @Override + public Integer get() throws Exception { + return atomicInteger.incrementAndGet(); + } + }; + + Single.fromSupplier(callable) + .test() + .assertResult(1); + + assertEquals(1, atomicInteger.get()); + + Single.fromSupplier(callable) + .test() + .assertResult(2); + + assertEquals(2, atomicInteger.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotInvokeFuncUntilSubscription() throws Throwable { + Supplier func = mock(Supplier.class); + + when(func.get()).thenReturn(new Object()); + + Single fromSupplierSingle = Single.fromSupplier(func); + + verifyZeroInteractions(func); + + fromSupplierSingle.subscribe(); + + verify(func).get(); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Single.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + cdl1.countDown(); + cdl2.await(5, TimeUnit.SECONDS); + return 1; + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.dispose(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertUndeliverable(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Throwable { + Supplier func = mock(Supplier.class); + + final CountDownLatch funcLatch = new CountDownLatch(1); + final CountDownLatch observerLatch = new CountDownLatch(1); + + when(func.get()).thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + observerLatch.countDown(); + + try { + funcLatch.await(); + } catch (InterruptedException e) { + // It's okay, unsubscription causes Thread interruption + + // Restoring interruption status of the Thread + Thread.currentThread().interrupt(); + } + + return "should_not_be_delivered"; + } + }); + + Single fromSupplierObservable = Single.fromSupplier(func); + + Observer observer = TestHelper.mockObserver(); + + TestObserver outer = new TestObserver(observer); + + fromSupplierObservable + .subscribeOn(Schedulers.computation()) + .subscribe(outer); + + // Wait until func will be invoked + observerLatch.await(); + + // Unsubscribing before emission + outer.dispose(); + + // Emitting result + funcLatch.countDown(); + + // func must be invoked + verify(func).get(); + + // Observer must not be notified at all + verify(observer).onSubscribe(any(Disposable.class)); + verifyNoMoreInteractions(observer); + } + + @Test + public void shouldAllowToThrowCheckedException() { + final Exception checkedException = new Exception("test exception"); + + Single fromSupplierObservable = Single.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + throw checkedException; + } + }); + + SingleObserver observer = TestHelper.mockSingleObserver(); + + fromSupplierObservable.subscribe(observer); + + verify(observer).onSubscribe(any(Disposable.class)); + verify(observer).onError(checkedException); + verifyNoMoreInteractions(observer); + } + + @Test + public void disposedOnArrival() { + final int[] count = { 0 }; + Single.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + count[0]++; + return 1; + } + }) + .test(true) + .assertEmpty(); + + assertEquals(0, count[0]); + } + + @Test + public void disposedOnCall() { + final TestObserver to = new TestObserver(); + + Single.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + to.dispose(); + return 1; + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void toObservableTake() { + Single.fromSupplier(new Supplier() { + @Override + public Object get() throws Exception { + return 1; + } + }) + .toObservable() + .take(1) + .test() + .assertResult(1); + } + + @Test + public void toObservableAndBack() { + Single.fromSupplier(new Supplier() { + @Override + public Integer get() throws Exception { + return 1; + } + }) + .toObservable() + .singleOrError() + .test() + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/tck/FromCallableTckTest.java b/src/test/java/io/reactivex/tck/FromCallableTckTest.java index 7131e0f88c..12e46a06c5 100644 --- a/src/test/java/io/reactivex/tck/FromCallableTckTest.java +++ b/src/test/java/io/reactivex/tck/FromCallableTckTest.java @@ -19,6 +19,7 @@ import org.testng.annotations.Test; import io.reactivex.Flowable; +import io.reactivex.exceptions.TestException; @Test public class FromCallableTckTest extends BaseTck { @@ -36,6 +37,19 @@ public Long call() throws Exception { ; } + @Override + public Publisher createFailedPublisher() { + return + Flowable.fromCallable(new Callable() { + @Override + public Long call() throws Exception { + throw new TestException(); + } + } + ) + ; + } + @Override public long maxElementsFromPublisher() { return 1; diff --git a/src/test/java/io/reactivex/tck/FromSupplierTckTest.java b/src/test/java/io/reactivex/tck/FromSupplierTckTest.java new file mode 100644 index 0000000000..0878adc619 --- /dev/null +++ b/src/test/java/io/reactivex/tck/FromSupplierTckTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Supplier; + +@Test +public class FromSupplierTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.fromSupplier(new Supplier() { + @Override + public Long get() throws Throwable { + return 1L; + } + } + ) + ; + } + + @Override + public Publisher createFailedPublisher() { + return + Flowable.fromSupplier(new Supplier() { + @Override + public Long get() throws Throwable { + throw new TestException(); + } + } + ) + ; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} From 59d1151ceebafd0647932246c2db32bd69bfe38d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 20 Jun 2019 20:51:31 +0200 Subject: [PATCH 2/2] Correct some missing callable-supplier name changes --- src/main/java/io/reactivex/Completable.java | 2 +- src/main/java/io/reactivex/Maybe.java | 2 +- .../internal/operators/maybe/MaybeFromSupplier.java | 2 +- .../operators/observable/ObservableFromSupplier.java | 4 ++-- .../internal/operators/maybe/MaybeFromSupplierTest.java | 8 ++++---- .../internal/operators/single/SingleFromSupplierTest.java | 8 ++++---- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index b618650d5d..da9a6d4253 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -637,7 +637,7 @@ public static Completable fromSingle(final SingleSource single) { @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static Completable fromSupplier(final Supplier supplier) { - ObjectHelper.requireNonNull(supplier, "callable is null"); + ObjectHelper.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier)); } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 72954d89d0..ffc042e178 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -875,7 +875,7 @@ public static Maybe fromRunnable(final Runnable run) { *

* This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver} * subscribes to the returned {@link Maybe}. In other terms, this source operator evaluates the given - * {@code Callable} "lazily". + * {@code Supplier} "lazily". *

* Note that the {@code null} handling of this operator differs from the similar source operators in the other * {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java index fa2e9c1cd4..2f33813898 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSupplier.java @@ -20,7 +20,7 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Executes a callable and signals its value as success or signals an exception. + * Executes a supplier and signals its value as success or signals an exception. * * @param the value type * @since 3.0.0 diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java index 2ba55b6f36..88791276c8 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromSupplier.java @@ -21,7 +21,7 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Calls a Callable and emits its resulting single value or signals its exception. + * Calls a Supplier and emits its resulting single value or signals its exception. * @param the value type * @since 3.0.0 */ @@ -57,6 +57,6 @@ public void subscribeActual(Observer observer) { @Override public T get() throws Throwable { - return ObjectHelper.requireNonNull(supplier.get(), "The callable returned a null value"); + return ObjectHelper.requireNonNull(supplier.get(), "The supplier returned a null value"); } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java index a281af9f4b..fdb91888b3 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSupplierTest.java @@ -61,7 +61,7 @@ public Object get() throws Exception { public void fromSupplierTwice() { final AtomicInteger atomicInteger = new AtomicInteger(); - Supplier callable = new Supplier() { + Supplier supplier = new Supplier() { @Override public Object get() throws Exception { atomicInteger.incrementAndGet(); @@ -69,13 +69,13 @@ public Object get() throws Exception { } }; - Maybe.fromSupplier(callable) + Maybe.fromSupplier(supplier) .test() .assertResult(); assertEquals(1, atomicInteger.get()); - Maybe.fromSupplier(callable) + Maybe.fromSupplier(supplier) .test() .assertResult(); @@ -117,7 +117,7 @@ public Object get() throws Exception { @SuppressWarnings("unchecked") @Test - public void callable() throws Throwable { + public void supplier() throws Throwable { final int[] counter = { 0 }; Maybe m = Maybe.fromSupplier(new Supplier() { diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java index b0cda6668e..6c823a44bb 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromSupplierTest.java @@ -36,7 +36,7 @@ public class SingleFromSupplierTest { @Test - public void fromCallableValue() { + public void fromSupplierValue() { Single.fromSupplier(new Supplier() { @Override public Integer get() throws Exception { return 5; @@ -72,20 +72,20 @@ public void fromSupplierNull() { public void fromSupplierTwice() { final AtomicInteger atomicInteger = new AtomicInteger(); - Supplier callable = new Supplier() { + Supplier supplier = new Supplier() { @Override public Integer get() throws Exception { return atomicInteger.incrementAndGet(); } }; - Single.fromSupplier(callable) + Single.fromSupplier(supplier) .test() .assertResult(1); assertEquals(1, atomicInteger.get()); - Single.fromSupplier(callable) + Single.fromSupplier(supplier) .test() .assertResult(2);