Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding delayError to Maybe.delay #6864

Merged
merged 1 commit into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,7 @@ public final Single<T> defaultIfEmpty(@NonNull T defaultItem) {
/**
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
* specified delay.
* An error signal will not be delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
* <dl>
Expand All @@ -2682,17 +2683,68 @@ public final Single<T> defaultIfEmpty(@NonNull T defaultItem) {
* @param time
* the delay to shift the source by
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* the {@link TimeUnit} in which {@code time} is defined
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code unit} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
* @see #delay(long, TimeUnit, Scheduler)
* @see #delay(long, TimeUnit, Scheduler, boolean)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@NonNull
public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
return delay(time, unit, Schedulers.computation());
return delay(time, unit, Schedulers.computation(), false);
}

/**
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
* specified delay.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time the delay to shift the source by
* @param unit the {@link TimeUnit} in which {@code time} is defined
* @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed.
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code unit} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
* @see #delay(long, TimeUnit, Scheduler, boolean)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@NonNull
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, boolean delayError) {
return delay(time, unit, Schedulers.computation(), delayError);
}

/**
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
* specified delay.
* An error signal will not be delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
* </dl>
*
* @param time the delay to shift the source by
* @param unit the {@link TimeUnit} in which {@code time} is defined
* @param scheduler the {@code Scheduler} to use for delaying
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
* @see #delay(long, TimeUnit, Scheduler, boolean)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
return delay(time, unit, scheduler, false);
}

/**
Expand All @@ -2708,20 +2760,21 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
* @param time
* the delay to shift the source by
* @param unit
* the time unit of {@code delay}
* the {@link TimeUnit} in which {@code time} is defined
* @param scheduler
* the {@code Scheduler} to use for delaying
* @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed.
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler));
return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler, delayError));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ public final class MaybeDelay<T> extends AbstractMaybeWithUpstream<T, T> {

final Scheduler scheduler;

public MaybeDelay(MaybeSource<T> source, long delay, TimeUnit unit, Scheduler scheduler) {
final boolean delayError;

public MaybeDelay(MaybeSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
super(source);
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler));
source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler, delayError));
}

static final class DelayMaybeObserver<T>
Expand All @@ -59,15 +62,18 @@ static final class DelayMaybeObserver<T>

final Scheduler scheduler;

final boolean delayError;

T value;

Throwable error;

DelayMaybeObserver(MaybeObserver<? super T> actual, long delay, TimeUnit unit, Scheduler scheduler) {
DelayMaybeObserver(MaybeObserver<? super T> actual, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
this.downstream = actual;
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}

@Override
Expand Down Expand Up @@ -105,21 +111,21 @@ public void onSubscribe(Disposable d) {
@Override
public void onSuccess(T value) {
this.value = value;
schedule();
schedule(delay);
}

@Override
public void onError(Throwable e) {
this.error = e;
schedule();
schedule(delayError ? delay : 0);
}

@Override
public void onComplete() {
schedule();
schedule(delay);
}

void schedule() {
void schedule(long delay) {
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,32 @@ public Maybe<Object> apply(Maybe<Object> f) throws Exception {
}
});
}

@Test
public void delayedErrorOnSuccess() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<Integer> observer = Maybe.just(1)
.delay(5, TimeUnit.SECONDS, scheduler, true)
.test();

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
observer.assertNoValues();

scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
observer.assertValue(1);
}

@Test
public void delayedErrorOnError() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<?> observer = Maybe.error(new TestException())
.delay(5, TimeUnit.SECONDS, scheduler, true)
.test();

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
observer.assertNoErrors();

scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
observer.assertError(TestException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public void checkParallelFlowable() {
// negative time is considered as zero time
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class));
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class));
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE));
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));

// zero repeat is allowed
addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE));
Expand Down