From da75c60b30a66efd683626290e0d193a339b065c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sun, 5 Jun 2016 22:31:20 -0700 Subject: [PATCH 1/3] Bump to RxJava 1.1.5 - Bump to RxJava 1.1.5 - Add missing operators --- build.sbt | 2 +- .../src/test/scala/examples/RxScalaDemo.scala | 134 +++++++++- src/main/scala/rx/lang/scala/Observable.scala | 236 +++++++++++++++++- .../ObservableCompletenessKit.scala | 17 +- 4 files changed, 383 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index 5e9b7773..a625ab16 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4") parallelExecution in Test := false libraryDependencies ++= Seq( - "io.reactivex" % "rxjava" % "1.1.1", + "io.reactivex" % "rxjava" % "1.1.5", "org.mockito" % "mockito-core" % "1.9.5" % "test", "junit" % "junit" % "4.11" % "test", "org.scalatest" %% "scalatest" % "2.2.6" % "test") diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala index e1f677a2..72c0cd2a 100644 --- a/examples/src/test/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/examples/RxScalaDemo.scala @@ -409,13 +409,28 @@ class RxScalaDemo extends JUnitSuite { val firstCounter = Observable.interval(250 millis) val secondCounter = Observable.interval(550 millis) val thirdCounter = Observable.interval(850 millis) - val sources = Seq(firstCounter, secondCounter, thirdCounter) + val sources = Iterable(firstCounter, secondCounter, thirdCounter) val combinedCounter = Observable.combineLatest(sources)(_.toList).take(10) combinedCounter subscribe {x => println(s"Emitted group: $x")} waitFor(combinedCounter) } + @Test def combineLatestDelayErrorExample() { + val firstCounter = Observable.just(1) ++ Observable.error(new RuntimeException("Oops!")) + val secondCounter = Observable.interval(550 millis).take(3) + val sources = Iterable(firstCounter, secondCounter) + + Observable.combineLatest(sources)(_.toList).subscribe( + i => println("combineLatest: " + i), + e => println("combineLatest: " + e.getMessage)) + Thread.sleep(2000) + Observable.combineLatestDelayError(sources)(_.toList).subscribe( + i => println("combineLatestDelayError: " + i), + e => println("combineLatestDelayError: " + e.getMessage)) + Thread.sleep(2000) + } + @Test def olympicsExampleWithoutPublish() { val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext")) medals.subscribe(println(_)) // triggers an execution of medals Observable @@ -1489,6 +1504,20 @@ class RxScalaDemo extends JUnitSuite { .toBlocking.foreach(println) } + @Test def concatMapDelayErrorExample() { + println("=== concatMap ===") + (1 to 10).toObservable + .concatMap { i => + if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i) + }.subscribe(println(_), _.printStackTrace) + + println("=== concatMapDelayError ===") + (1 to 10).toObservable + .concatMapDelayError{ i => + if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i) + }.subscribe(println(_), _.printStackTrace) + } + @Test def onErrorResumeNextExample() { val o = Observable { (subscriber: Subscriber[Int]) => @@ -1512,6 +1541,43 @@ class RxScalaDemo extends JUnitSuite { }.subscribe(println(_)) } + @Test def switchExample(): Unit = { + Observable.interval(300 millis).take(3).map { n => + Observable.interval(100 millis) + .map(l => s"o$n emit $l") + .take(3) + .doOnSubscribe(println(s"subscribe to o$n")) + }.switch.take(5).subscribe(println(_)) + } + + @Test def switchDelayErrorExample(): Unit = { + println("=== switch ===") + Observable.interval(300 millis).take(3).map { n => + if (n == 0) { + Observable.error(new RuntimeException("Oops!")) + } else { + Observable.interval(100 millis) + .map(l => s"o$n emit $l") + .take(3) + .doOnSubscribe(println(s"subscribe to o$n")) + } + }.switch.subscribe(println(_), _.printStackTrace()) + + Thread.sleep(2000) + + println("=== switchDelayError ===") + Observable.interval(300 millis).take(3).map { n => + if (n == 0) { + Observable.error(new RuntimeException("Oops!")) + } else { + Observable.interval(100 millis) + .map(l => s"o$n emit $l") + .take(3) + .doOnSubscribe(println(s"subscribe to o$n")) + } + }.switchDelayError.subscribe(println(_), _.printStackTrace()) + } + @Test def switchMapExample() { val o = Observable.interval(300 millis).take(5).switchMap[String] { n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}") @@ -1519,6 +1585,34 @@ class RxScalaDemo extends JUnitSuite { o.toBlocking.foreach(println) } + @Test def switchMapDelayErrorExample() { + println("=== switchMap ===") + Observable.interval(300 millis).take(3).switchMap { n => + if (n == 0) { + Observable.error(new RuntimeException("Oops!")) + } else { + Observable.interval(100 millis) + .map(l => s"o$n emit $l") + .take(3) + .doOnSubscribe(println(s"subscribe to o$n")) + } + }.subscribe(println(_), _.printStackTrace()) + + Thread.sleep(2000) + + println("=== switchMapDelayError ===") + Observable.interval(300 millis).take(3).switchMapDelayError { n => + if (n == 0) { + Observable.error(new RuntimeException("Oops!")) + } else { + Observable.interval(100 millis) + .map(l => s"o$n emit $l") + .take(3) + .doOnSubscribe(println(s"subscribe to o$n")) + } + }.subscribe(println(_), _.printStackTrace()) + } + @Test def joinExample() { val o1 = Observable.interval(500 millis).map(n => "1: " + n) val o2 = Observable.interval(100 millis).map(n => "2: " + n) @@ -1591,6 +1685,7 @@ class RxScalaDemo extends JUnitSuite { @Test def onBackpressureDropExample() { val o = createFastObservable.onBackpressureDrop val l = new CountDownLatch(1) + // Use a small buffer to demonstrate the drop behavior o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] { override def onStart() { request(1) @@ -1713,6 +1808,12 @@ class RxScalaDemo extends JUnitSuite { }).subscribe(println(_)) } + @Test def concatMapEagerExample3(): Unit = { + (0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => { + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + }).subscribe(println(_)) + } + @Test def flattenDelayErrorExample() { val o1 = Observable.just(1).delay(200 millis). flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1")) @@ -1778,4 +1879,35 @@ class RxScalaDemo extends JUnitSuite { println("3rd Observer is subscribing") o.subscribe(i => println(s"s3: $i")) } + + def concatDelayErrorExample(): Unit = { + val o1 = Observable.error(new RuntimeException("Oops!")) + val o2 = Observable.just(1, 2, 3) + val os = Observable.just(o1, o2) + os.concat.subscribe(i => println("concat: " + i), e => println("concat: " + e.getMessage)) + os.concatDelayError.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage)) + } + + def onTerminateDetachExample(): Unit = { + { + var o = new Object + val weakReference = new scala.ref.WeakReference[Object](o) + // s will have a reference to "o" without onTerminateDetach + val s = Observable.just(o).size.subscribe(println(_)) + o = null + System.gc() // It's fine for a demo even if it's just a best effort + Thread.sleep(2000) + println(s"without onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}") + } + { + var o = new Object + val weakReference = new scala.ref.WeakReference[Object](o) + // s won't have a reference to "o" since they are detached. + val s = Observable.just(o).size.onTerminateDetach.subscribe(println(_)) + o = null + System.gc() // It's fine for a demo even if it's just a best effort + Thread.sleep(2000) + println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}") + } + } } diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 18d0dc2b..4419ffec 100644 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -100,6 +100,10 @@ import scala.reflect.ClassTag * ===Scheduler:=== * This method does not operate by default on a particular [[Scheduler]]. * + * @define supportBackpressure + * ===Backpressure:=== + * Fully supports backpressure. + * * @define debounceVsThrottle * Information on debounce vs throttle: * - [[http://drupalmotion.com/article/debounce-and-throttle-visual-explanation]] @@ -312,6 +316,26 @@ trait Observable[+T] toScalaObservable[U](o5) } + /** + * $experimental Concatenates the [[Observable]] sequence of [[Observable]]s into a single sequence by subscribing to + * each inner [[Observable]], one after the other, one at a time and delays any errors till the all inner and the + * outer [[Observable]]s terminate. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @return the new [[Observable]] with the concatenating behavior + */ + @Experimental + def concatDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.concatDelayError[U](o4) + toScalaObservable[U](o5) + } + /** * Returns a new Observable that emits items resulting from applying a function that you supply to each item * emitted by the source Observable, where that function returns an Observable, and then emitting the items @@ -331,6 +355,27 @@ trait Observable[+T] })) } + /** + * $experimental Maps each of the items into an [[Observable]], subscribes to them one after the other, + * one at a time and emits their values in order while delaying any error from either this or any of the inner [[Observable]]s + * till all of them terminate. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @param f the function that maps the items of this [[Observable]] into the inner [[Observable]]s. + * @return the new [[Observable]] instance with the concatenation behavior + */ + @Experimental + def concatMapDelayError[R](f: T => Observable[R]): Observable[R] = { + toScalaObservable[R](asJavaObservable.concatMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] { + def call(t1: T): rx.Observable[_ <: R] = { + f(t1).asJavaObservable + } + })) + } + /** * $experimental Concatenates `this` and `that` source [[Observable]]s eagerly into a single stream of values. * @@ -468,6 +513,34 @@ trait Observable[+T] }, capacityHint)) } + /** + * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single [[Observable]]. + * + * Eager concatenation means that once a [[Subscriber]] subscribes, this operator subscribes to all of the + * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in + * order, each one after the previous one completes. + * + * ===Backpressure:=== + * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources + * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer. + * + * $noDefaultScheduler + * + * @param capacityHint hints about the number of expected source sequence values + * @param maxConcurrent the maximum number of concurrent subscribed [[Observable]]s + * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be eagerly concatenated + * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by + * `f`, one after the other, without interleaving them + */ + @Experimental + def concatMapEager[R](capacityHint: Int, maxConcurrent: Int, f: T => Observable[R]): Observable[R] = { + toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] { + def call(t1: T): rx.Observable[_ <: R] = { + f(t1).asJavaObservable + } + }, capacityHint, maxConcurrent)) + } + /** * Wraps this Observable in another Observable that ensures that the resulting * Observable is chronologically well-behaved. @@ -1363,6 +1436,50 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError)) } + /** + * REturns an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]], + * asynchronously with a bounded buffer of configurable size. + * + * Note that `onError` notifications will cut ahead of `onNext` notifications on the emission thread if [[Scheduler]] is truly + * asynchronous. If strict event ordering is required, consider using the + * [[Observable.observeOn(scheduler:rx\.lang\.scala\.Scheduler,delayError:Boolean)*]] overload. + * + * + * + * ===Scheduler:=== + * you specify which [[Scheduler]] this operator will use + * + * @param scheduler the [[Scheduler]] to notify [[Observer]]s on + * @param bufferSize the size of the buffer. + * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]] + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + */ + def observeOn(scheduler: Scheduler, bufferSize: Int): Observable[T] = { + toScalaObservable[T](asJavaObservable.observeOn(scheduler, bufferSize)) + } + + /** + * Returns an [[Observable]] to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with a bounded buffer of configurable size and optionally delays `onError` notifications. + * + * + * + * ===Scheduler:=== + * you specify which [[Scheduler]] this operator will use + * + * @param scheduler the [[Scheduler]] to notify [[Observer]]s on + * @param delayError indicates if the `onError` notification may not cut ahead of `onNext` notification on the other side of the + * scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream + * @param bufferSize the size of the buffer + * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]] + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + */ + def observeOn(scheduler: Scheduler, delayError: Boolean, bufferSize: Int): Observable[T] = { + toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError, bufferSize)) + } + /** * Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by * transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items @@ -2565,6 +2682,27 @@ trait Observable[+T] })) } + /** + * $experimental Returns a new [[Observable]] by applying a function that you supply to each item emitted by the source + * [[Observable]] that returns an [[Observable]], and then emitting the items emitted by the most recently emitted + * of these [[Observable]]s and delays any error until all [[Observable]]s terminate. + * + * + * + * $noDefaultScheduler + * + * @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]] + * @return an [[Observable]] that emits the items emitted by the [[Observable]] returned from applying `f` to the most + * recently emitted item emitted by the source [[Observable]] + * @see ReactiveX operators documentation: FlatMap + */ + @Experimental + def switchMapDelayError[R](f: T => Observable[R]): Observable[R] = { + toScalaObservable[R](asJavaObservable.switchMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] { + def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable + })) + } + /** * $experimental Returns an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an alternate * [[Observable]] if the source [[Observable]] is empty. @@ -2606,6 +2744,33 @@ trait Observable[+T] } // Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword + + /** + * $experimental Converts this [[Observable]] that emits [[Observable]]s into an [[Observable]] that emits the items emitted by the + * most recently emitted of those [[Observable]]s and delays any exception until all [[Observable]]s terminate. + * + * + * + * It subscribes to an [[Observable]] that emits [[Observable]]s. Each time it observes one of + * these emitted [[Observable]]s, the [[Observable]] returned by this method begins emitting the items + * emitted by that [[Observable]]. When a new [[Observable]] is emitted, it stops emitting items + * from the earlier-emitted [[Observable]] and begins emitting items from the new one. + * + * $noDefaultScheduler + * + * @return an [[Observable]] that emits the items emitted by the [[Observable]] most recently emitted by the source + * [[Observable]] + * @see ReactiveX operators documentation: Switch + */ + @Experimental + def switchDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.switchOnNextDelayError[U](o4) + toScalaObservable[U](o5) + } + /** * Flattens two Observables into one Observable, without any transformation. * @@ -3869,6 +4034,22 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate)) } + /** + * $experimental Return a new [[Observable]] that will null out references to the upstream [[Producer]] and downstream [[Subscriber]] if + * the sequence is terminated or downstream unsubscribes. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @return an [[Observable]] which out references to the upstream [[Producer]] and downstream [[Subscriber]] if the sequence is + * terminated or downstream unsubscribes + */ + @Experimental + def onTerminateDetach: Observable[T] = { + toScalaObservable[T](asJavaObservable.onTerminateDetach()) + } + /** * Modifies the source `Observable` so that it invokes the given action when it is unsubscribed from * its subscribers. Each un-subscription will result in an invocation of the given action except when the @@ -4733,6 +4914,9 @@ trait Observable[+T] * ===Scheduler:=== * This method does not operate by default on a particular [[Scheduler]]. * + * @define supportBackpressure + * ===Backpressure:=== + * Fully supports backpressure. */ object Observable { import scala.collection.JavaConverters._ @@ -4768,7 +4952,6 @@ object Observable { * See Rx Design Guidelines (PDF) * for detailed information. * - * * @tparam T * the type of the items that this Observable emits. * @param f @@ -5160,6 +5343,7 @@ object Observable { * @return an Observable that emits items that are the result of combining the items emitted by the source * Observables by means of the given aggregation function */ + @deprecated("Use [[[Observable.combineLatest[T,R](sources:Iterable[rx\\.lang\\.scala\\.Observable[T]])(combineFunction:Seq[T]=>R):*]]] instead", "0.26.2") def combineLatest[T, R](sources: Seq[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = { val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava) val jCombineFunction = new rx.functions.FuncN[R] { @@ -5167,5 +5351,55 @@ object Observable { } toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction)) } + + /** + * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest + * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this + * aggregation is defined by a specified function. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @tparam T the common base type of source values + * @tparam R the result type + * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s + * @param combineFunction the aggregation function used to combine the items emitted by the source [[Observable]]s + * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source + * [[Observable]]s by means of the given aggregation function + * @see ReactiveX operators documentation: CombineLatest + */ + def combineLatest[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = { + val jSources = sources.map(_.asJavaObservable).asJava + val jCombineFunction = new rx.functions.FuncN[R] { + override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T])) + } + toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction)) + } + + /** + * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest + * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this + * aggregation is defined by a specified function and delays any error from the sources until all source [[Observable]]s terminate. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @tparam T the common base type of source values + * @tparam R the result type + * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s + * @param combineFunction the aggregation function used to combine the items emitted by the source [Observable]]s + * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source + * [[Observable]]s by means of the given aggregation function + * @see ReactiveX operators documentation: CombineLatest + */ + def combineLatestDelayError[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = { + val jSources = sources.map(_.asJavaObservable).asJava + val jCombineFunction = new rx.functions.FuncN[R] { + override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T])) + } + toScalaObservable[R](rx.Observable.combineLatestDelayError[T, R](jSources, jCombineFunction)) + } } diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala index e3756f2d..81adb50f 100644 --- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala +++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala @@ -62,6 +62,8 @@ class ObservableCompletenessKit extends CompletenessKit { "collect(Func0[R], Action2[R, _ >: T])" -> "[TODO: See /~https://github.com/ReactiveX/RxScala/issues/63]", "compose(Transformer[_ >: T, _ <: R])" -> "[use extension methods instead]", "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "concatMapEager(Int, T => Observable[R])", + "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int, Int)" -> "concatMapEager(Int, Int, T => Observable[R])", + "concatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]])" -> "[use `concatMap(t => f(t).toObservable)`]", "concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]", "contains(Any)" -> "contains(U)", "count()" -> "length", @@ -96,6 +98,8 @@ class ObservableCompletenessKit extends CompletenessKit { "limit(Int)" -> "take(Int)", "flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapWith(T => Observable[U])((T, U) => R)", "flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterableWith(T => Iterable[U])((T, U) => R)", + "flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]], Int)" -> "[use `flatMap(int, t => collectionSelector(t).toObservable)`]", + "flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "[use `flatMapWith(Int, t => collectionSelector(t).toObservable)(resultSelector)`]", "flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int, T => Observable[U])((T, U) => R)", "flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])", "flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])", @@ -103,6 +107,7 @@ class ObservableCompletenessKit extends CompletenessKit { "mergeWith(Observable[_ <: T])" -> "merge(Observable[U])", "ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]", "onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)", + "onBackpressureBuffer(Long, Action0, Strategy)" -> "[TODO]", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", @@ -188,10 +193,15 @@ class ObservableCompletenessKit extends CompletenessKit { // manually added entries for Java static methods "amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)", "create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)", - "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", - "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)", - "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "[use `combineLatest(iter.toSeq)(Seq[T] => R)`]", + "create(SyncOnSubscribe[S, T])" -> "[TODO]", + "create(AsyncOnSubscribe[S, T])" -> "[TODO]", + "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatestWith(Observable[U])((T, U) => R)", + "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", + "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", + "combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", + "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "concatDelayError(<:<[Observable[T], Observable[Observable[U]]])", + "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatDelayError`]", "concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatEager`]", @@ -221,6 +231,7 @@ class ObservableCompletenessKit extends CompletenessKit { "range(Int, Int)" -> commentForRange, "range(Int, Int, Scheduler)" -> "[use `(start until end).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]", "switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])", + "switchOnNextDelayError(Observable[_ <: Observable[_ <: T]])" -> "switchDelayError(<:<[Observable[T], Observable[Observable[U]]])", "using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)", "using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)", "withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)", From 2bc482e78b9571cf877b731296a37801d49ace59 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 10 Jun 2016 23:54:38 -0700 Subject: [PATCH 2/3] Add ErrorDelayingObservable --- .../src/test/scala/examples/RxScalaDemo.scala | 18 +- src/main/scala/rx/lang/scala/Observable.scala | 136 ++----------- .../observables/ErrorDelayingObservable.scala | 191 ++++++++++++++++++ .../ObservableCompletenessKit.scala | 18 +- 4 files changed, 224 insertions(+), 139 deletions(-) create mode 100644 src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala index 72c0cd2a..795e83be 100644 --- a/examples/src/test/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/examples/RxScalaDemo.scala @@ -1511,9 +1511,9 @@ class RxScalaDemo extends JUnitSuite { if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i) }.subscribe(println(_), _.printStackTrace) - println("=== concatMapDelayError ===") + println("=== delayError.concatMap ===") (1 to 10).toObservable - .concatMapDelayError{ i => + .delayError.concatMap { i => if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i) }.subscribe(println(_), _.printStackTrace) } @@ -1565,7 +1565,7 @@ class RxScalaDemo extends JUnitSuite { Thread.sleep(2000) - println("=== switchDelayError ===") + println("=== delayError.switch ===") Observable.interval(300 millis).take(3).map { n => if (n == 0) { Observable.error(new RuntimeException("Oops!")) @@ -1575,7 +1575,7 @@ class RxScalaDemo extends JUnitSuite { .take(3) .doOnSubscribe(println(s"subscribe to o$n")) } - }.switchDelayError.subscribe(println(_), _.printStackTrace()) + }.delayError.switch.subscribe(println(_), _.printStackTrace()) } @Test def switchMapExample() { @@ -1600,8 +1600,8 @@ class RxScalaDemo extends JUnitSuite { Thread.sleep(2000) - println("=== switchMapDelayError ===") - Observable.interval(300 millis).take(3).switchMapDelayError { n => + println("=== delayError.switchMap ===") + Observable.interval(300 millis).take(3).delayError.switchMap { n => if (n == 0) { Observable.error(new RuntimeException("Oops!")) } else { @@ -1820,7 +1820,7 @@ class RxScalaDemo extends JUnitSuite { val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2")) val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3")) val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4")) - Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace()) + Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace()) } @Test def flattenDelayErrorExample2() { @@ -1829,7 +1829,7 @@ class RxScalaDemo extends JUnitSuite { val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2")) val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3")) val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4")) - Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace()) + Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace()) } @Test def blockingObservableSubscribeExample(): Unit = { @@ -1885,7 +1885,7 @@ class RxScalaDemo extends JUnitSuite { val o2 = Observable.just(1, 2, 3) val os = Observable.just(o1, o2) os.concat.subscribe(i => println("concat: " + i), e => println("concat: " + e.getMessage)) - os.concatDelayError.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage)) + os.delayError.concat.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage)) } def onTerminateDetachExample(): Unit = { diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 4419ffec..13567513 100644 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -19,9 +19,10 @@ package rx.lang.scala import rx.annotations.{Beta, Experimental} import rx.exceptions.OnErrorNotImplementedException import rx.functions.FuncN -import rx.lang.scala.observables.ConnectableObservable +import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable} import scala.concurrent.duration import java.util + import collection.JavaConversions._ import scala.collection.generic.CanBuildFrom import scala.annotation.unchecked.uncheckedVariance @@ -316,26 +317,6 @@ trait Observable[+T] toScalaObservable[U](o5) } - /** - * $experimental Concatenates the [[Observable]] sequence of [[Observable]]s into a single sequence by subscribing to - * each inner [[Observable]], one after the other, one at a time and delays any errors till the all inner and the - * outer [[Observable]]s terminate. - * - * $supportBackpressure - * - * $noDefaultScheduler - * - * @return the new [[Observable]] with the concatenating behavior - */ - @Experimental - def concatDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { - val o2: Observable[Observable[U]] = this - val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) - val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable - val o5 = rx.Observable.concatDelayError[U](o4) - toScalaObservable[U](o5) - } - /** * Returns a new Observable that emits items resulting from applying a function that you supply to each item * emitted by the source Observable, where that function returns an Observable, and then emitting the items @@ -355,27 +336,6 @@ trait Observable[+T] })) } - /** - * $experimental Maps each of the items into an [[Observable]], subscribes to them one after the other, - * one at a time and emits their values in order while delaying any error from either this or any of the inner [[Observable]]s - * till all of them terminate. - * - * $supportBackpressure - * - * $noDefaultScheduler - * - * @param f the function that maps the items of this [[Observable]] into the inner [[Observable]]s. - * @return the new [[Observable]] instance with the concatenation behavior - */ - @Experimental - def concatMapDelayError[R](f: T => Observable[R]): Observable[R] = { - toScalaObservable[R](asJavaObservable.concatMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] { - def call(t1: T): rx.Observable[_ <: R] = { - f(t1).asJavaObservable - } - })) - } - /** * $experimental Concatenates `this` and `that` source [[Observable]]s eagerly into a single stream of values. * @@ -2682,27 +2642,6 @@ trait Observable[+T] })) } - /** - * $experimental Returns a new [[Observable]] by applying a function that you supply to each item emitted by the source - * [[Observable]] that returns an [[Observable]], and then emitting the items emitted by the most recently emitted - * of these [[Observable]]s and delays any error until all [[Observable]]s terminate. - * - * - * - * $noDefaultScheduler - * - * @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]] - * @return an [[Observable]] that emits the items emitted by the [[Observable]] returned from applying `f` to the most - * recently emitted item emitted by the source [[Observable]] - * @see ReactiveX operators documentation: FlatMap - */ - @Experimental - def switchMapDelayError[R](f: T => Observable[R]): Observable[R] = { - toScalaObservable[R](asJavaObservable.switchMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] { - def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable - })) - } - /** * $experimental Returns an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an alternate * [[Observable]] if the source [[Observable]] is empty. @@ -2744,33 +2683,6 @@ trait Observable[+T] } // Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword - - /** - * $experimental Converts this [[Observable]] that emits [[Observable]]s into an [[Observable]] that emits the items emitted by the - * most recently emitted of those [[Observable]]s and delays any exception until all [[Observable]]s terminate. - * - * - * - * It subscribes to an [[Observable]] that emits [[Observable]]s. Each time it observes one of - * these emitted [[Observable]]s, the [[Observable]] returned by this method begins emitting the items - * emitted by that [[Observable]]. When a new [[Observable]] is emitted, it stops emitting items - * from the earlier-emitted [[Observable]] and begins emitting items from the new one. - * - * $noDefaultScheduler - * - * @return an [[Observable]] that emits the items emitted by the [[Observable]] most recently emitted by the source - * [[Observable]] - * @see ReactiveX operators documentation: Switch - */ - @Experimental - def switchDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { - val o2: Observable[Observable[U]] = this - val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) - val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable - val o5 = rx.Observable.switchOnNextDelayError[U](o4) - toScalaObservable[U](o5) - } - /** * Flattens two Observables into one Observable, without any transformation. * @@ -2790,7 +2702,7 @@ trait Observable[+T] toScalaObservable[U](rx.Observable.merge(thisJava, thatJava)) } - /** + /** * This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables * notify of an error via [[rx.lang.scala.Observer.onError onError]], `mergeDelayError` will * refrain from propagating that error notification until all of the merged Observables have @@ -2809,6 +2721,7 @@ trait Observable[+T] * @return an Observable that emits items that are the result of flattening the items emitted by * `this` and `that` */ + @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.merge delayError.merge]]] instead", "0.26.2") def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = { toScalaObservable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable)) } @@ -2881,6 +2794,7 @@ trait Observable[+T] * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this` * @see ReactiveX operators documentation: Merge */ + @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.flatten[U](implicit* delayError.flatten]]] instead", "0.26.2") def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { val o2: Observable[Observable[U]] = this val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) @@ -2889,37 +2803,6 @@ trait Observable[+T] toScalaObservable[U](o5) } - /** - * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to - * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by - * an error notification from one of them, while limiting the - * number of concurrent subscriptions to these [[Observable]]s. - * - * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an - * error via `onError`, `flattenDelayError` will refrain from propagating that - * error notification until all of the merged [[Observable]]s have finished emitting items. - * - * - * - * Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only - * invoke the `onError` method of its `Observer`s once. - * - * $noDefaultScheduler - * - * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently - * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this` - * @see ReactiveX operators documentation: Merge - * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) - */ - @Experimental - def flattenDelayError[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { - val o2: Observable[Observable[U]] = this - val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) - val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable - val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent) - toScalaObservable[U](o5) - } - /** * Combines two observables, emitting a pair of the latest values of each of * the source observables each time an event is received from one of the source observables. @@ -3890,6 +3773,15 @@ trait Observable[+T] new BlockingObservable[T](this) } + /** + * $experimental Converts an [[Observable]] into a [[rx.lang.scala.observables.ErrorDelayingObservable ErrorDelayingObservable]] + * that provides operators which delay errors when composing multiple [[Observable]]s. + */ + @Experimental + def delayError: ErrorDelayingObservable[T] = { + new ErrorDelayingObservable[T](this) + } + /** Tests whether a predicate holds for some of the elements of this `Observable`. * * @param p the predicate used to test elements. diff --git a/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala b/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala new file mode 100644 index 00000000..c1c927ea --- /dev/null +++ b/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala @@ -0,0 +1,191 @@ +package rx.lang.scala.observables + +import rx.annotations.Experimental +import rx.lang.scala.JavaConversions._ +import rx.lang.scala._ +import ImplicitFunctionConversions._ + +/** + * $experimental An [[Observable]] that provides operators which delay errors when composing multiple [[Observable]]s. + * + * @define noDefaultScheduler + * ===Scheduler:=== + * This method does not operate by default on a particular [[Scheduler]]. + * + * @define supportBackpressure + * ===Backpressure:=== + * Fully supports backpressure. + * + * @define experimental + * EXPERIMENTAL + */ +@Experimental +class ErrorDelayingObservable[+T] private[scala](val o: Observable[T]) extends AnyVal { + + /** + * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to + * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by + * an error notification from one of them, while limiting the + * number of concurrent subscriptions to these [[Observable]]s. + * + * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an + * error via `onError`, it will refrain from propagating that + * error notification until all of the merged [[Observable]]s have finished emitting items. + * + * + * + * Even if multiple merged [[Observable]]s send `onError` notifications, it will only + * invoke the `onError` method of its `Observer`s once. + * + * $noDefaultScheduler + * + * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this` + * @see ReactiveX operators documentation: Merge + */ + @Experimental + def flatten[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = o + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.mergeDelayError[U](o4) + toScalaObservable[U](o5) + } + + /** + * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to + * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by + * an error notification from one of them, while limiting the + * number of concurrent subscriptions to these [[Observable]]s. + * + * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an + * error via `onError`, it will refrain from propagating that + * error notification until all of the merged [[Observable]]s have finished emitting items. + * + * + * + * Even if multiple merged [[Observable]]s send `onError` notifications, it will only + * invoke the `onError` method of its `Observer`s once. + * + * $noDefaultScheduler + * + * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently + * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this` + * @see ReactiveX operators documentation: Merge + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = o + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent) + toScalaObservable[U](o5) + } + + /** + * This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables + * notify of an error via [[rx.lang.scala.Observer.onError onError]], it will + * refrain from propagating that error notification until all of the merged Observables have + * finished emitting items. + * + * + * + * Even if multiple merged Observables send `onError` notifications, it will only invoke the `onError` method of its + * Observers once. + * + * This method allows an Observer to receive all successfully emitted items from all of the + * source Observables without being interrupted by an error notification from one of them. + * + * @param that + * an Observable to be merged + * @return an Observable that emits items that are the result of flattening the items emitted by + * `this` and `that` + */ + def merge[U >: T](that: Observable[U]): Observable[U] = { + toScalaObservable[U](rx.Observable.mergeDelayError[U](o.asJavaObservable, that.asJavaObservable)) + } + + /** + * $experimental Converts this [[Observable]] that emits [[Observable]]s into an [[Observable]] that emits the items emitted by the + * most recently emitted of those [[Observable]]s and delays any exception until all [[Observable]]s terminate. + * + * + * + * It subscribes to an [[Observable]] that emits [[Observable]]s. Each time it observes one of + * these emitted [[Observable]]s, the [[Observable]] returned by this method begins emitting the items + * emitted by that [[Observable]]. When a new [[Observable]] is emitted, it stops emitting items + * from the earlier-emitted [[Observable]] and begins emitting items from the new one. + * + * $noDefaultScheduler + * + * @return an [[Observable]] that emits the items emitted by the [[Observable]] most recently emitted by the source + * [[Observable]] + * @see ReactiveX operators documentation: Switch + */ + @Experimental + def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = o + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.switchOnNextDelayError[U](o4) + toScalaObservable[U](o5) + } + + /** + * $experimental Returns a new [[Observable]] by applying a function that you supply to each item emitted by the source + * [[Observable]] that returns an [[Observable]], and then emitting the items emitted by the most recently emitted + * of these [[Observable]]s and delays any error until all [[Observable]]s terminate. + * + * + * + * $noDefaultScheduler + * + * @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]] + * @return an [[Observable]] that emits the items emitted by the [[Observable]] returned from applying `f` to the most + * recently emitted item emitted by the source [[Observable]] + * @see ReactiveX operators documentation: FlatMap + */ + @Experimental + def switchMap[R](f: T => Observable[R]): Observable[R] = { + val jf: rx.functions.Func1[T, rx.Observable[_ <: R]] = (t: T) => f(t).asJavaObservable + toScalaObservable[R](o.asJavaObservable.switchMapDelayError[R](jf)) + } + + /** + * $experimental Concatenates the [[Observable]] sequence of [[Observable]]s into a single sequence by subscribing to + * each inner [[Observable]], one after the other, one at a time and delays any errors till the all inner and the + * outer [[Observable]]s terminate. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @return the new [[Observable]] with the concatenating behavior + */ + @Experimental + def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = o + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.concatDelayError[U](o4) + toScalaObservable[U](o5) + } + + /** + * $experimental Maps each of the items into an [[Observable]], subscribes to them one after the other, + * one at a time and emits their values in order while delaying any error from either this or any of the inner [[Observable]]s + * till all of them terminate. + * + * $supportBackpressure + * + * $noDefaultScheduler + * + * @param f the function that maps the items of this [[Observable]] into the inner [[Observable]]s. + * @return the new [[Observable]] instance with the concatenation behavior + */ + @Experimental + def concatMap[R](f: T => Observable[R]): Observable[R] = { + val jf: rx.functions.Func1[T, rx.Observable[_ <: R]] = (t: T) => f(t).asJavaObservable + toScalaObservable[R](o.asJavaObservable.concatMapDelayError[R](jf)) + } +} diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala index 81adb50f..2162dcb6 100644 --- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala +++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala @@ -64,6 +64,7 @@ class ObservableCompletenessKit extends CompletenessKit { "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "concatMapEager(Int, T => Observable[R])", "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int, Int)" -> "concatMapEager(Int, Int, T => Observable[R])", "concatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]])" -> "[use `concatMap(t => f(t).toObservable)`]", + "concatMapDelayError(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "[use `delayError.concatMap(T => Observable[R])`]", "concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]", "contains(Any)" -> "contains(U)", "count()" -> "length", @@ -147,6 +148,7 @@ class ObservableCompletenessKit extends CompletenessKit { "skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)", "subscribe()" -> "subscribe()", "switchIfEmpty(Observable[_ <: T])" -> "switchIfEmpty(Observable[U])", + "switchMapDelayError(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "[use `delayError.switchMap(T => Observable[R])`]", "takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]", "takeLast(Int)" -> "takeRight(Int)", "takeLast(Long, TimeUnit)" -> "takeRight(Duration)", @@ -200,8 +202,8 @@ class ObservableCompletenessKit extends CompletenessKit { "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", "combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", - "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "concatDelayError(<:<[Observable[T], Observable[Observable[U]]])", - "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatDelayError`]", + "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concatDelay(<:<[Observable[T], Observable[Observable[U]]])`]", + "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concatDelay`]", "concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatEager`]", @@ -221,17 +223,17 @@ class ObservableCompletenessKit extends CompletenessKit { "merge(Iterable[_ <: Observable[_ <: T]])" -> "[use `Observable.from(iter).flatten`]", "merge(Array[Observable[_ <: T]], Int)" -> "[use `Observable.from(array).flatten(n)`]", "merge(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `Observable.from(iter).flatten(n)`]", - "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", - "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", - "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])", - "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.flattenDelayError`]", - "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.flattenDelayError(Int)`]", + "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "[use `delayError.merge(Observable[U])`]", + "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.flatten(<:<[Observable[T], Observable[Observable[U]]])`]", + "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "[use `delayError.flatten(Int)(<:<[Observable[T], Observable[Observable[U]]])`]", + "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.flatten`]", + "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.delayError.flatten(Int)`]", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)", "range(Int, Int)" -> commentForRange, "range(Int, Int, Scheduler)" -> "[use `(start until end).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]", "switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])", - "switchOnNextDelayError(Observable[_ <: Observable[_ <: T]])" -> "switchDelayError(<:<[Observable[T], Observable[Observable[U]]])", + "switchOnNextDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.switch(<:<[Observable[T], Observable[Observable[U]]])`]", "using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)", "using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)", "withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)", From 4d96d5711a5c32d211cca1bdfa8c5d9457cab922 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sun, 12 Jun 2016 14:48:58 -0700 Subject: [PATCH 3/3] Fix ObservableCompletenessKit --- .../lang/scala/completeness/ObservableCompletenessKit.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala index 2162dcb6..83133dc8 100644 --- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala +++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala @@ -202,8 +202,8 @@ class ObservableCompletenessKit extends CompletenessKit { "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", "combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", - "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concatDelay(<:<[Observable[T], Observable[Observable[U]]])`]", - "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concatDelay`]", + "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]", + "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]", "concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatEager`]",