+ * If the current {@code Single} fails, the resulting {@code Single} will
+ * pass along the signal to the downstream. To measure the time to error,
+ * use {@link #materialize()} and apply {@link #timeInterval(Scheduler)}.
+ *
> timeInterval(@NonNull Scheduler scheduler) {
+ return timeInterval(TimeUnit.MILLISECONDS, scheduler);
+ }
+
+ /**
+ * Measures the time between the subscription and success item emission
+ * of the current {@code Single} and signals it as a tuple ({@link Timed})
+ * success value.
+ *
+ *
+ *
+ * If the current {@code Single} fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To measure the time to error,
+ * use {@link #materialize()} and apply {@link #timeInterval(TimeUnit, Scheduler)}.
+ *
+ * - Scheduler:
+ * - {@code timeInterval} uses the {@code computation} {@link Scheduler}
+ * for determining the current time upon subscription and upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.COMPUTATION)
+ public final Single> timeInterval(@NonNull TimeUnit unit) {
+ return timeInterval(unit, Schedulers.computation());
+ }
+
+ /**
+ * Measures the time between the subscription and success item emission
+ * of the current {@code Single} and signals it as a tuple ({@link Timed})
+ * success value.
+ *
+ *
+ *
+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To measure the time to termination,
+ * use {@link #materialize()} and apply {@link #timeInterval(TimeUnit, Scheduler)}.
+ *
+ * - Scheduler:
+ * - {@code timeInterval} uses the provided {@link Scheduler}
+ * for determining the current time upon subscription and upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement
+ * @param scheduler the {@code Scheduler} used for providing the current time
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ public final Single> timeInterval(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ return RxJavaPlugins.onAssembly(new SingleTimeInterval<>(this, unit, scheduler, true));
+ }
+
+ /**
+ * Combines the success value from the current {@code Single} with the current time (in milliseconds) of
+ * its reception, using the {@code computation} {@link Scheduler} as time source,
+ * then signals them as a {@link Timed} instance.
+ *
+ *
+ *
+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To get the timestamp of the error,
+ * use {@link #materialize()} and apply {@link #timestamp()}.
+ *
+ * - Scheduler:
+ * - {@code timestamp} uses the {@code computation} {@code Scheduler}
+ * for determining the current time upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @return the new {@code Single} instance
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.COMPUTATION)
+ public final Single> timestamp() {
+ return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation());
+ }
+
+ /**
+ * Combines the success value from the current {@code Single} with the current time (in milliseconds) of
+ * its reception, using the given {@link Scheduler} as time source,
+ * then signals them as a {@link Timed} instance.
+ *
+ *
+ *
+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To get the timestamp of the error,
+ * use {@link #materialize()} and apply {@link #timestamp(Scheduler)}.
+ *
+ * - Scheduler:
+ * - {@code timestamp} uses the provided {@code Scheduler}
+ * for determining the current time upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @param scheduler the {@code Scheduler} used for providing the current time
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code scheduler} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ public final Single> timestamp(@NonNull Scheduler scheduler) {
+ return timestamp(TimeUnit.MILLISECONDS, scheduler);
+ }
+
+ /**
+ * Combines the success value from the current {@code Single} with the current time of
+ * its reception, using the {@code computation} {@link Scheduler} as time source,
+ * then signals it as a {@link Timed} instance.
+ *
+ *
+ *
+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To get the timestamp of the error,
+ * use {@link #materialize()} and apply {@link #timestamp(TimeUnit)}.
+ *
+ * - Scheduler:
+ * - {@code timestamp} uses the {@code computation} {@code Scheduler},
+ * for determining the current time upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code unit} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.COMPUTATION)
+ public final Single> timestamp(@NonNull TimeUnit unit) {
+ return timestamp(unit, Schedulers.computation());
+ }
+
+ /**
+ * Combines the success value from the current {@code Single} with the current time of
+ * its reception, using the given {@link Scheduler} as time source,
+ * then signals it as a {@link Timed} instance.
+ *
+ *
+ *
+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will
+ * pass along the signals to the downstream. To get the timestamp of the error,
+ * use {@link #materialize()} and apply {@link #timestamp(TimeUnit, Scheduler)}.
+ *
+ * - Scheduler:
+ * - {@code timestamp} uses the provided {@code Scheduler},
+ * which is used for determining the current time upon receiving the
+ * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement
+ * @param scheduler the {@code Scheduler} used for providing the current time
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ public final Single> timestamp(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ return RxJavaPlugins.onAssembly(new SingleTimeInterval<>(this, unit, scheduler, false));
+ }
+
/**
* Returns a {@code Single} that emits the item emitted by the current {@code Single} until a {@link CompletableSource} terminates. Upon
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java
new file mode 100644
index 0000000000..af30ee3a1d
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.schedulers.Timed;
+
+/**
+ * Measures the time between subscription and the success item emission
+ * from the upstream and emits this as a {@link Timed} success value.
+ * @param the element type of the sequence
+ * @since 3.0.0
+ */
+public final class MaybeTimeInterval extends Maybe> {
+
+ final MaybeSource source;
+
+ final TimeUnit unit;
+
+ final Scheduler scheduler;
+
+ final boolean start;
+
+ public MaybeTimeInterval(MaybeSource source, TimeUnit unit, Scheduler scheduler, boolean start) {
+ this.source = source;
+ this.unit = unit;
+ this.scheduler = scheduler;
+ this.start = start;
+ }
+
+ @Override
+ protected void subscribeActual(@NonNull MaybeObserver super @NonNull Timed> observer) {
+ source.subscribe(new TimeIntervalMaybeObserver<>(observer, unit, scheduler, start));
+ }
+
+ static final class TimeIntervalMaybeObserver implements MaybeObserver, Disposable {
+
+ final MaybeObserver super Timed> downstream;
+
+ final TimeUnit unit;
+
+ final Scheduler scheduler;
+
+ final long startTime;
+
+ Disposable upstream;
+
+ TimeIntervalMaybeObserver(MaybeObserver super Timed> downstream, TimeUnit unit, Scheduler scheduler, boolean start) {
+ this.downstream = downstream;
+ this.unit = unit;
+ this.scheduler = scheduler;
+ this.startTime = start ? scheduler.now(unit) : 0L;
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ if (DisposableHelper.validate(this.upstream, d)) {
+ this.upstream = d;
+
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onSuccess(@NonNull T t) {
+ downstream.onSuccess(new Timed<>(t, scheduler.now(unit) - startTime, unit));
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ downstream.onError(e);
+ }
+
+ @Override
+ public void onComplete() {
+ downstream.onComplete();
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java
new file mode 100644
index 0000000000..2eac3bbc3a
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.schedulers.Timed;
+
+/**
+ * Measures the time between subscription and the success item emission
+ * from the upstream and emits this as a {@link Timed} success value.
+ * @param the element type of the sequence
+ * @since 3.0.0
+ */
+public final class SingleTimeInterval extends Single> {
+
+ final SingleSource source;
+
+ final TimeUnit unit;
+
+ final Scheduler scheduler;
+
+ final boolean start;
+
+ public SingleTimeInterval(SingleSource source, TimeUnit unit, Scheduler scheduler, boolean start) {
+ this.source = source;
+ this.unit = unit;
+ this.scheduler = scheduler;
+ this.start = start;
+ }
+
+ @Override
+ protected void subscribeActual(@NonNull SingleObserver super @NonNull Timed> observer) {
+ source.subscribe(new TimeIntervalSingleObserver<>(observer, unit, scheduler, start));
+ }
+
+ static final class TimeIntervalSingleObserver implements SingleObserver, Disposable {
+
+ final SingleObserver super Timed> downstream;
+
+ final TimeUnit unit;
+
+ final Scheduler scheduler;
+
+ final long startTime;
+
+ Disposable upstream;
+
+ TimeIntervalSingleObserver(SingleObserver super Timed> downstream, TimeUnit unit, Scheduler scheduler, boolean start) {
+ this.downstream = downstream;
+ this.unit = unit;
+ this.scheduler = scheduler;
+ this.startTime = start ? scheduler.now(unit) : 0L;
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ if (DisposableHelper.validate(this.upstream, d)) {
+ this.upstream = d;
+
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onSuccess(@NonNull T t) {
+ downstream.onSuccess(new Timed<>(t, scheduler.now(unit) - startTime, unit));
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ downstream.onError(e);
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
index 814b223dcf..a7b6cc46e7 100644
--- a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
+++ b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
@@ -33,10 +33,10 @@ public final class Timed {
* @param value the value to hold
* @param time the time to hold
* @param unit the time unit, not null
- * @throws NullPointerException if unit is {@code null}
+ * @throws NullPointerException if {@code value} or {@code unit} is {@code null}
*/
public Timed(@NonNull T value, long time, @NonNull TimeUnit unit) {
- this.value = value;
+ this.value = Objects.requireNonNull(value, "value is null");
this.time = time;
this.unit = Objects.requireNonNull(unit, "unit is null");
}
@@ -89,7 +89,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
- int h = value != null ? value.hashCode() : 0;
+ int h = value.hashCode();
h = h * 31 + (int)((time >>> 31) ^ time);
h = h * 31 + unit.hashCode();
return h;
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java
new file mode 100644
index 0000000000..30b770aed4
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.*;
+import io.reactivex.rxjava3.subjects.MaybeSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class MaybeTimeIntervalTest {
+
+ @Test
+ public void just() {
+ Maybe.just(1)
+ .timeInterval()
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void empty() {
+ Maybe.empty()
+ .timeInterval()
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void error() {
+ Maybe.error(new TestException())
+ .timeInterval()
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void justSeconds() {
+ Maybe.just(1)
+ .timeInterval(TimeUnit.SECONDS)
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justScheduler() {
+ Maybe.just(1)
+ .timeInterval(Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justSecondsScheduler() {
+ Maybe.just(1)
+ .timeInterval(TimeUnit.SECONDS, Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeMaybe(m -> m.timeInterval());
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(MaybeSubject.create().timeInterval());
+ }
+
+ @Test
+ public void timeInfo() {
+ TestScheduler scheduler = new TestScheduler();
+
+ MaybeSubject ms = MaybeSubject.create();
+
+ TestObserver> to = ms
+ .timeInterval(scheduler)
+ .test();
+
+ scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
+
+ ms.onSuccess(1);
+
+ to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java
new file mode 100644
index 0000000000..1ce230f8d0
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.*;
+import io.reactivex.rxjava3.subjects.MaybeSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class MaybeTimestampTest {
+
+ @Test
+ public void just() {
+ Maybe.just(1)
+ .timestamp()
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void empty() {
+ Maybe.empty()
+ .timestamp()
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void error() {
+ Maybe.error(new TestException())
+ .timestamp()
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void justSeconds() {
+ Maybe.just(1)
+ .timestamp(TimeUnit.SECONDS)
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justScheduler() {
+ Maybe.just(1)
+ .timestamp(Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justSecondsScheduler() {
+ Maybe.just(1)
+ .timestamp(TimeUnit.SECONDS, Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeMaybe(m -> m.timestamp());
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(MaybeSubject.create().timestamp());
+ }
+
+ @Test
+ public void timeInfo() {
+ TestScheduler scheduler = new TestScheduler();
+
+ MaybeSubject ms = MaybeSubject.create();
+
+ TestObserver> to = ms
+ .timestamp(scheduler)
+ .test();
+
+ scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
+
+ ms.onSuccess(1);
+
+ to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java
new file mode 100644
index 0000000000..09dd26eeba
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.*;
+import io.reactivex.rxjava3.subjects.SingleSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class SingleTimeIntervalTest {
+
+ @Test
+ public void just() {
+ Single.just(1)
+ .timestamp()
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void error() {
+ Single.error(new TestException())
+ .timestamp()
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void justSeconds() {
+ Single.just(1)
+ .timestamp(TimeUnit.SECONDS)
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justScheduler() {
+ Single.just(1)
+ .timestamp(Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justSecondsScheduler() {
+ Single.just(1)
+ .timestamp(TimeUnit.SECONDS, Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeSingle(m -> m.timestamp());
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(SingleSubject.create().timestamp());
+ }
+
+ @Test
+ public void timeInfo() {
+ TestScheduler scheduler = new TestScheduler();
+
+ SingleSubject ss = SingleSubject.create();
+
+ TestObserver> to = ss
+ .timestamp(scheduler)
+ .test();
+
+ scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
+
+ ss.onSuccess(1);
+
+ to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java
new file mode 100644
index 0000000000..a282c0345e
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.*;
+import io.reactivex.rxjava3.subjects.SingleSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class SingleTimestampTest {
+
+ @Test
+ public void just() {
+ Single.just(1)
+ .timeInterval()
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void error() {
+ Single.error(new TestException())
+ .timeInterval()
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void justSeconds() {
+ Single.just(1)
+ .timeInterval(TimeUnit.SECONDS)
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justScheduler() {
+ Single.just(1)
+ .timeInterval(Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void justSecondsScheduler() {
+ Single.just(1)
+ .timeInterval(TimeUnit.SECONDS, Schedulers.single())
+ .test()
+ .assertValueCount(1)
+ .assertNoErrors()
+ .assertComplete();
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeSingle(m -> m.timeInterval());
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(SingleSubject.create().timeInterval());
+ }
+
+ @Test
+ public void timeInfo() {
+ TestScheduler scheduler = new TestScheduler();
+
+ SingleSubject ss = SingleSubject.create();
+
+ TestObserver> to = ss
+ .timeInterval(scheduler)
+ .test();
+
+ scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
+
+ ss.onSuccess(1);
+
+ to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
index 60b5a11fb6..a0ba3e52e4 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
@@ -458,6 +458,7 @@ static String findNotes(String clazzName, String operatorName) {
" C throttleLatest Always empty thus no items to work with.",
" MS throttleWithTimeout At most one item signaled so no subsequent items to work with.",
" C throttleWithTimeout Always empty thus no items to work with.",
+ " C timeInterval Always empty thus no items to work with.",
" C timestamp Always empty thus no items to work with.",
"FO toCompletionStage Use [`firstStage`](#firstStage), [`lastStage`](#lastStage) or [`singleStage`](#singleStage).",
"F toFlowable Would be no-op.",
diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java
index d7a5eb752e..ce117c2fb7 100644
--- a/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java
+++ b/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java
@@ -39,7 +39,7 @@ public void hashCodeOf() {
assertEquals(TimeUnit.SECONDS.hashCode() + 31 * (5 + 31 * 1), t1.hashCode());
- Timed t2 = new Timed<>(null, 5, TimeUnit.SECONDS);
+ Timed t2 = new Timed<>(0, 5, TimeUnit.SECONDS);
assertEquals(TimeUnit.SECONDS.hashCode() + 31 * (5 + 31 * 0), t2.hashCode());
}
diff --git a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java
index d351e7809d..e22fdd9521 100644
--- a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java
+++ b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java
@@ -153,12 +153,12 @@ public void maybeDocRefersToMaybeTypes() throws Exception {
jdx = 0;
for (;;) {
int idx = m.javadoc.indexOf("Single", jdx);
- if (idx >= 0) {
+ if (idx >= 0 && m.javadoc.indexOf("Single#", jdx) != idx) {
int j = m.javadoc.indexOf("#toSingle", jdx);
int k = m.javadoc.indexOf("{@code Single", jdx);
if (!m.signature.contains("Single") && (j + 3 != idx && k + 7 != idx)) {
e.append("java.lang.RuntimeException: Maybe doc mentions Single but not in the signature\r\n at io.reactivex.rxjava3.core.")
- .append("Maybe(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n");
+ .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n");
}
jdx = idx + 6;
} else {