diff --git a/build.sbt b/build.sbt
index 5e9b7773..a625ab16 100644
--- a/build.sbt
+++ b/build.sbt
@@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
parallelExecution in Test := false
libraryDependencies ++= Seq(
- "io.reactivex" % "rxjava" % "1.1.1",
+ "io.reactivex" % "rxjava" % "1.1.5",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala
index e1f677a2..795e83be 100644
--- a/examples/src/test/scala/examples/RxScalaDemo.scala
+++ b/examples/src/test/scala/examples/RxScalaDemo.scala
@@ -409,13 +409,28 @@ class RxScalaDemo extends JUnitSuite {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
val thirdCounter = Observable.interval(850 millis)
- val sources = Seq(firstCounter, secondCounter, thirdCounter)
+ val sources = Iterable(firstCounter, secondCounter, thirdCounter)
val combinedCounter = Observable.combineLatest(sources)(_.toList).take(10)
combinedCounter subscribe {x => println(s"Emitted group: $x")}
waitFor(combinedCounter)
}
+ @Test def combineLatestDelayErrorExample() {
+ val firstCounter = Observable.just(1) ++ Observable.error(new RuntimeException("Oops!"))
+ val secondCounter = Observable.interval(550 millis).take(3)
+ val sources = Iterable(firstCounter, secondCounter)
+
+ Observable.combineLatest(sources)(_.toList).subscribe(
+ i => println("combineLatest: " + i),
+ e => println("combineLatest: " + e.getMessage))
+ Thread.sleep(2000)
+ Observable.combineLatestDelayError(sources)(_.toList).subscribe(
+ i => println("combineLatestDelayError: " + i),
+ e => println("combineLatestDelayError: " + e.getMessage))
+ Thread.sleep(2000)
+ }
+
@Test def olympicsExampleWithoutPublish() {
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
medals.subscribe(println(_)) // triggers an execution of medals Observable
@@ -1489,6 +1504,20 @@ class RxScalaDemo extends JUnitSuite {
.toBlocking.foreach(println)
}
+ @Test def concatMapDelayErrorExample() {
+ println("=== concatMap ===")
+ (1 to 10).toObservable
+ .concatMap { i =>
+ if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
+ }.subscribe(println(_), _.printStackTrace)
+
+ println("=== delayError.concatMap ===")
+ (1 to 10).toObservable
+ .delayError.concatMap { i =>
+ if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
+ }.subscribe(println(_), _.printStackTrace)
+ }
+
@Test def onErrorResumeNextExample() {
val o = Observable {
(subscriber: Subscriber[Int]) =>
@@ -1512,6 +1541,43 @@ class RxScalaDemo extends JUnitSuite {
}.subscribe(println(_))
}
+ @Test def switchExample(): Unit = {
+ Observable.interval(300 millis).take(3).map { n =>
+ Observable.interval(100 millis)
+ .map(l => s"o$n emit $l")
+ .take(3)
+ .doOnSubscribe(println(s"subscribe to o$n"))
+ }.switch.take(5).subscribe(println(_))
+ }
+
+ @Test def switchDelayErrorExample(): Unit = {
+ println("=== switch ===")
+ Observable.interval(300 millis).take(3).map { n =>
+ if (n == 0) {
+ Observable.error(new RuntimeException("Oops!"))
+ } else {
+ Observable.interval(100 millis)
+ .map(l => s"o$n emit $l")
+ .take(3)
+ .doOnSubscribe(println(s"subscribe to o$n"))
+ }
+ }.switch.subscribe(println(_), _.printStackTrace())
+
+ Thread.sleep(2000)
+
+ println("=== delayError.switch ===")
+ Observable.interval(300 millis).take(3).map { n =>
+ if (n == 0) {
+ Observable.error(new RuntimeException("Oops!"))
+ } else {
+ Observable.interval(100 millis)
+ .map(l => s"o$n emit $l")
+ .take(3)
+ .doOnSubscribe(println(s"subscribe to o$n"))
+ }
+ }.delayError.switch.subscribe(println(_), _.printStackTrace())
+ }
+
@Test def switchMapExample() {
val o = Observable.interval(300 millis).take(5).switchMap[String] {
n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}")
@@ -1519,6 +1585,34 @@ class RxScalaDemo extends JUnitSuite {
o.toBlocking.foreach(println)
}
+ @Test def switchMapDelayErrorExample() {
+ println("=== switchMap ===")
+ Observable.interval(300 millis).take(3).switchMap { n =>
+ if (n == 0) {
+ Observable.error(new RuntimeException("Oops!"))
+ } else {
+ Observable.interval(100 millis)
+ .map(l => s"o$n emit $l")
+ .take(3)
+ .doOnSubscribe(println(s"subscribe to o$n"))
+ }
+ }.subscribe(println(_), _.printStackTrace())
+
+ Thread.sleep(2000)
+
+ println("=== delayError.switchMap ===")
+ Observable.interval(300 millis).take(3).delayError.switchMap { n =>
+ if (n == 0) {
+ Observable.error(new RuntimeException("Oops!"))
+ } else {
+ Observable.interval(100 millis)
+ .map(l => s"o$n emit $l")
+ .take(3)
+ .doOnSubscribe(println(s"subscribe to o$n"))
+ }
+ }.subscribe(println(_), _.printStackTrace())
+ }
+
@Test def joinExample() {
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
@@ -1591,6 +1685,7 @@ class RxScalaDemo extends JUnitSuite {
@Test def onBackpressureDropExample() {
val o = createFastObservable.onBackpressureDrop
val l = new CountDownLatch(1)
+ // Use a small buffer to demonstrate the drop behavior
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
@@ -1713,13 +1808,19 @@ class RxScalaDemo extends JUnitSuite {
}).subscribe(println(_))
}
+ @Test def concatMapEagerExample3(): Unit = {
+ (0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => {
+ Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
+ }).subscribe(println(_))
+ }
+
@Test def flattenDelayErrorExample() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
- Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace())
+ Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace())
}
@Test def flattenDelayErrorExample2() {
@@ -1728,7 +1829,7 @@ class RxScalaDemo extends JUnitSuite {
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
- Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace())
+ Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace())
}
@Test def blockingObservableSubscribeExample(): Unit = {
@@ -1778,4 +1879,35 @@ class RxScalaDemo extends JUnitSuite {
println("3rd Observer is subscribing")
o.subscribe(i => println(s"s3: $i"))
}
+
+ def concatDelayErrorExample(): Unit = {
+ val o1 = Observable.error(new RuntimeException("Oops!"))
+ val o2 = Observable.just(1, 2, 3)
+ val os = Observable.just(o1, o2)
+ os.concat.subscribe(i => println("concat: " + i), e => println("concat: " + e.getMessage))
+ os.delayError.concat.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage))
+ }
+
+ def onTerminateDetachExample(): Unit = {
+ {
+ var o = new Object
+ val weakReference = new scala.ref.WeakReference[Object](o)
+ // s will have a reference to "o" without onTerminateDetach
+ val s = Observable.just(o).size.subscribe(println(_))
+ o = null
+ System.gc() // It's fine for a demo even if it's just a best effort
+ Thread.sleep(2000)
+ println(s"without onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
+ }
+ {
+ var o = new Object
+ val weakReference = new scala.ref.WeakReference[Object](o)
+ // s won't have a reference to "o" since they are detached.
+ val s = Observable.just(o).size.onTerminateDetach.subscribe(println(_))
+ o = null
+ System.gc() // It's fine for a demo even if it's just a best effort
+ Thread.sleep(2000)
+ println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
+ }
+ }
}
diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala
index 18d0dc2b..13567513 100644
--- a/src/main/scala/rx/lang/scala/Observable.scala
+++ b/src/main/scala/rx/lang/scala/Observable.scala
@@ -19,9 +19,10 @@ package rx.lang.scala
import rx.annotations.{Beta, Experimental}
import rx.exceptions.OnErrorNotImplementedException
import rx.functions.FuncN
-import rx.lang.scala.observables.ConnectableObservable
+import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
import scala.concurrent.duration
import java.util
+
import collection.JavaConversions._
import scala.collection.generic.CanBuildFrom
import scala.annotation.unchecked.uncheckedVariance
@@ -100,6 +101,10 @@ import scala.reflect.ClassTag
* ===Scheduler:===
* This method does not operate by default on a particular [[Scheduler]].
*
+ * @define supportBackpressure
+ * ===Backpressure:===
+ * Fully supports backpressure.
+ *
* @define debounceVsThrottle
* Information on debounce vs throttle:
* - [[http://drupalmotion.com/article/debounce-and-throttle-visual-explanation]]
@@ -468,6 +473,34 @@ trait Observable[+T]
}, capacityHint))
}
+ /**
+ * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single [[Observable]].
+ *
+ * Eager concatenation means that once a [[Subscriber]] subscribes, this operator subscribes to all of the
+ * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
+ * order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @param capacityHint hints about the number of expected source sequence values
+ * @param maxConcurrent the maximum number of concurrent subscribed [[Observable]]s
+ * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be eagerly concatenated
+ * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
+ * `f`, one after the other, without interleaving them
+ */
+ @Experimental
+ def concatMapEager[R](capacityHint: Int, maxConcurrent: Int, f: T => Observable[R]): Observable[R] = {
+ toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
+ def call(t1: T): rx.Observable[_ <: R] = {
+ f(t1).asJavaObservable
+ }
+ }, capacityHint, maxConcurrent))
+ }
+
/**
* Wraps this Observable in another Observable that ensures that the resulting
* Observable is chronologically well-behaved.
@@ -1363,6 +1396,50 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError))
}
+ /**
+ * REturns an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]],
+ * asynchronously with a bounded buffer of configurable size.
+ *
+ * Note that `onError` notifications will cut ahead of `onNext` notifications on the emission thread if [[Scheduler]] is truly
+ * asynchronous. If strict event ordering is required, consider using the
+ * [[Observable.observeOn(scheduler:rx\.lang\.scala\.Scheduler,delayError:Boolean)*]] overload.
+ *
+ *
+ *
+ * ===Scheduler:===
+ * you specify which [[Scheduler]] this operator will use
+ *
+ * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
+ * @param bufferSize the size of the buffer.
+ * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]]
+ * @see ReactiveX operators documentation: ObserveOn
+ * @see RxJava Threading Examples
+ */
+ def observeOn(scheduler: Scheduler, bufferSize: Int): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.observeOn(scheduler, bufferSize))
+ }
+
+ /**
+ * Returns an [[Observable]] to perform its emissions and notifications on a specified {@link Scheduler},
+ * asynchronously with a bounded buffer of configurable size and optionally delays `onError` notifications.
+ *
+ *
+ *
+ * ===Scheduler:===
+ * you specify which [[Scheduler]] this operator will use
+ *
+ * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
+ * @param delayError indicates if the `onError` notification may not cut ahead of `onNext` notification on the other side of the
+ * scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
+ * @param bufferSize the size of the buffer
+ * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]]
+ * @see ReactiveX operators documentation: ObserveOn
+ * @see RxJava Threading Examples
+ */
+ def observeOn(scheduler: Scheduler, delayError: Boolean, bufferSize: Int): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError, bufferSize))
+ }
+
/**
* Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by
* transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items
@@ -2625,7 +2702,7 @@ trait Observable[+T]
toScalaObservable[U](rx.Observable.merge(thisJava, thatJava))
}
- /**
+ /**
* This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables
* notify of an error via [[rx.lang.scala.Observer.onError onError]], `mergeDelayError` will
* refrain from propagating that error notification until all of the merged Observables have
@@ -2644,6 +2721,7 @@ trait Observable[+T]
* @return an Observable that emits items that are the result of flattening the items emitted by
* `this` and `that`
*/
+ @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.merge delayError.merge]]] instead", "0.26.2")
def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = {
toScalaObservable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable))
}
@@ -2716,6 +2794,7 @@ trait Observable[+T]
* @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
* @see ReactiveX operators documentation: Merge
*/
+ @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.flatten[U](implicit* delayError.flatten]]] instead", "0.26.2")
def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
@@ -2724,37 +2803,6 @@ trait Observable[+T]
toScalaObservable[U](o5)
}
- /**
- * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
- * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
- * an error notification from one of them, while limiting the
- * number of concurrent subscriptions to these [[Observable]]s.
- *
- * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
- * error via `onError`, `flattenDelayError` will refrain from propagating that
- * error notification until all of the merged [[Observable]]s have finished emitting items.
- *
- *
- *
- * Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only
- * invoke the `onError` method of its `Observer`s once.
- *
- * $noDefaultScheduler
- *
- * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
- * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
- * @see ReactiveX operators documentation: Merge
- * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
- */
- @Experimental
- def flattenDelayError[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
- val o2: Observable[Observable[U]] = this
- val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
- val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
- val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent)
- toScalaObservable[U](o5)
- }
-
/**
* Combines two observables, emitting a pair of the latest values of each of
* the source observables each time an event is received from one of the source observables.
@@ -3725,6 +3773,15 @@ trait Observable[+T]
new BlockingObservable[T](this)
}
+ /**
+ * $experimental Converts an [[Observable]] into a [[rx.lang.scala.observables.ErrorDelayingObservable ErrorDelayingObservable]]
+ * that provides operators which delay errors when composing multiple [[Observable]]s.
+ */
+ @Experimental
+ def delayError: ErrorDelayingObservable[T] = {
+ new ErrorDelayingObservable[T](this)
+ }
+
/** Tests whether a predicate holds for some of the elements of this `Observable`.
*
* @param p the predicate used to test elements.
@@ -3869,6 +3926,22 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate))
}
+ /**
+ * $experimental Return a new [[Observable]] that will null out references to the upstream [[Producer]] and downstream [[Subscriber]] if
+ * the sequence is terminated or downstream unsubscribes.
+ *
+ * $supportBackpressure
+ *
+ * $noDefaultScheduler
+ *
+ * @return an [[Observable]] which out references to the upstream [[Producer]] and downstream [[Subscriber]] if the sequence is
+ * terminated or downstream unsubscribes
+ */
+ @Experimental
+ def onTerminateDetach: Observable[T] = {
+ toScalaObservable[T](asJavaObservable.onTerminateDetach())
+ }
+
/**
* Modifies the source `Observable` so that it invokes the given action when it is unsubscribed from
* its subscribers. Each un-subscription will result in an invocation of the given action except when the
@@ -4733,6 +4806,9 @@ trait Observable[+T]
* ===Scheduler:===
* This method does not operate by default on a particular [[Scheduler]].
*
+ * @define supportBackpressure
+ * ===Backpressure:===
+ * Fully supports backpressure.
*/
object Observable {
import scala.collection.JavaConverters._
@@ -4768,7 +4844,6 @@ object Observable {
* See Rx Design Guidelines (PDF)
* for detailed information.
*
- *
* @tparam T
* the type of the items that this Observable emits.
* @param f
@@ -5160,6 +5235,7 @@ object Observable {
* @return an Observable that emits items that are the result of combining the items emitted by the source
* Observables by means of the given aggregation function
*/
+ @deprecated("Use [[[Observable.combineLatest[T,R](sources:Iterable[rx\\.lang\\.scala\\.Observable[T]])(combineFunction:Seq[T]=>R):*]]] instead", "0.26.2")
def combineLatest[T, R](sources: Seq[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava)
val jCombineFunction = new rx.functions.FuncN[R] {
@@ -5167,5 +5243,55 @@ object Observable {
}
toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
}
+
+ /**
+ * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest
+ * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this
+ * aggregation is defined by a specified function.
+ *
+ * $supportBackpressure
+ *
+ * $noDefaultScheduler
+ *
+ * @tparam T the common base type of source values
+ * @tparam R the result type
+ * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s
+ * @param combineFunction the aggregation function used to combine the items emitted by the source [[Observable]]s
+ * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source
+ * [[Observable]]s by means of the given aggregation function
+ * @see ReactiveX operators documentation: CombineLatest
+ */
+ def combineLatest[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
+ val jSources = sources.map(_.asJavaObservable).asJava
+ val jCombineFunction = new rx.functions.FuncN[R] {
+ override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
+ }
+ toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
+ }
+
+ /**
+ * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest
+ * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this
+ * aggregation is defined by a specified function and delays any error from the sources until all source [[Observable]]s terminate.
+ *
+ * $supportBackpressure
+ *
+ * $noDefaultScheduler
+ *
+ * @tparam T the common base type of source values
+ * @tparam R the result type
+ * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s
+ * @param combineFunction the aggregation function used to combine the items emitted by the source [Observable]]s
+ * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source
+ * [[Observable]]s by means of the given aggregation function
+ * @see ReactiveX operators documentation: CombineLatest
+ */
+ def combineLatestDelayError[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
+ val jSources = sources.map(_.asJavaObservable).asJava
+ val jCombineFunction = new rx.functions.FuncN[R] {
+ override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
+ }
+ toScalaObservable[R](rx.Observable.combineLatestDelayError[T, R](jSources, jCombineFunction))
+ }
}
diff --git a/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala b/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala
new file mode 100644
index 00000000..c1c927ea
--- /dev/null
+++ b/src/main/scala/rx/lang/scala/observables/ErrorDelayingObservable.scala
@@ -0,0 +1,191 @@
+package rx.lang.scala.observables
+
+import rx.annotations.Experimental
+import rx.lang.scala.JavaConversions._
+import rx.lang.scala._
+import ImplicitFunctionConversions._
+
+/**
+ * $experimental An [[Observable]] that provides operators which delay errors when composing multiple [[Observable]]s.
+ *
+ * @define noDefaultScheduler
+ * ===Scheduler:===
+ * This method does not operate by default on a particular [[Scheduler]].
+ *
+ * @define supportBackpressure
+ * ===Backpressure:===
+ * Fully supports backpressure.
+ *
+ * @define experimental
+ * EXPERIMENTAL
+ */
+@Experimental
+class ErrorDelayingObservable[+T] private[scala](val o: Observable[T]) extends AnyVal {
+
+ /**
+ * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
+ * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
+ * an error notification from one of them, while limiting the
+ * number of concurrent subscriptions to these [[Observable]]s.
+ *
+ * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
+ * error via `onError`, it will refrain from propagating that
+ * error notification until all of the merged [[Observable]]s have finished emitting items.
+ *
+ *
+ *
+ * Even if multiple merged [[Observable]]s send `onError` notifications, it will only
+ * invoke the `onError` method of its `Observer`s once.
+ *
+ * $noDefaultScheduler
+ *
+ * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
+ * @see ReactiveX operators documentation: Merge
+ */
+ @Experimental
+ def flatten[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = o
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.mergeDelayError[U](o4)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
+ * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
+ * an error notification from one of them, while limiting the
+ * number of concurrent subscriptions to these [[Observable]]s.
+ *
+ * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
+ * error via `onError`, it will refrain from propagating that
+ * error notification until all of the merged [[Observable]]s have finished emitting items.
+ *
+ *
+ *
+ * Even if multiple merged [[Observable]]s send `onError` notifications, it will only
+ * invoke the `onError` method of its `Observer`s once.
+ *
+ * $noDefaultScheduler
+ *
+ * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
+ * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
+ * @see ReactiveX operators documentation: Merge
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = o
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables
+ * notify of an error via [[rx.lang.scala.Observer.onError onError]], it will
+ * refrain from propagating that error notification until all of the merged Observables have
+ * finished emitting items.
+ *
+ *
+ *
+ * Even if multiple merged Observables send `onError` notifications, it will only invoke the `onError` method of its
+ * Observers once.
+ *
+ * This method allows an Observer to receive all successfully emitted items from all of the
+ * source Observables without being interrupted by an error notification from one of them.
+ *
+ * @param that
+ * an Observable to be merged
+ * @return an Observable that emits items that are the result of flattening the items emitted by
+ * `this` and `that`
+ */
+ def merge[U >: T](that: Observable[U]): Observable[U] = {
+ toScalaObservable[U](rx.Observable.mergeDelayError[U](o.asJavaObservable, that.asJavaObservable))
+ }
+
+ /**
+ * $experimental Converts this [[Observable]] that emits [[Observable]]s into an [[Observable]] that emits the items emitted by the
+ * most recently emitted of those [[Observable]]s and delays any exception until all [[Observable]]s terminate.
+ *
+ *
+ *
+ * It subscribes to an [[Observable]] that emits [[Observable]]s. Each time it observes one of
+ * these emitted [[Observable]]s, the [[Observable]] returned by this method begins emitting the items
+ * emitted by that [[Observable]]. When a new [[Observable]] is emitted, it stops emitting items
+ * from the earlier-emitted [[Observable]] and begins emitting items from the new one.
+ *
+ * $noDefaultScheduler
+ *
+ * @return an [[Observable]] that emits the items emitted by the [[Observable]] most recently emitted by the source
+ * [[Observable]]
+ * @see ReactiveX operators documentation: Switch
+ */
+ @Experimental
+ def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = o
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.switchOnNextDelayError[U](o4)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * $experimental Returns a new [[Observable]] by applying a function that you supply to each item emitted by the source
+ * [[Observable]] that returns an [[Observable]], and then emitting the items emitted by the most recently emitted
+ * of these [[Observable]]s and delays any error until all [[Observable]]s terminate.
+ *
+ *
+ *
+ * $noDefaultScheduler
+ *
+ * @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]]
+ * @return an [[Observable]] that emits the items emitted by the [[Observable]] returned from applying `f` to the most
+ * recently emitted item emitted by the source [[Observable]]
+ * @see ReactiveX operators documentation: FlatMap
+ */
+ @Experimental
+ def switchMap[R](f: T => Observable[R]): Observable[R] = {
+ val jf: rx.functions.Func1[T, rx.Observable[_ <: R]] = (t: T) => f(t).asJavaObservable
+ toScalaObservable[R](o.asJavaObservable.switchMapDelayError[R](jf))
+ }
+
+ /**
+ * $experimental Concatenates the [[Observable]] sequence of [[Observable]]s into a single sequence by subscribing to
+ * each inner [[Observable]], one after the other, one at a time and delays any errors till the all inner and the
+ * outer [[Observable]]s terminate.
+ *
+ * $supportBackpressure
+ *
+ * $noDefaultScheduler
+ *
+ * @return the new [[Observable]] with the concatenating behavior
+ */
+ @Experimental
+ def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = o
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.concatDelayError[U](o4)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * $experimental Maps each of the items into an [[Observable]], subscribes to them one after the other,
+ * one at a time and emits their values in order while delaying any error from either this or any of the inner [[Observable]]s
+ * till all of them terminate.
+ *
+ * $supportBackpressure
+ *
+ * $noDefaultScheduler
+ *
+ * @param f the function that maps the items of this [[Observable]] into the inner [[Observable]]s.
+ * @return the new [[Observable]] instance with the concatenation behavior
+ */
+ @Experimental
+ def concatMap[R](f: T => Observable[R]): Observable[R] = {
+ val jf: rx.functions.Func1[T, rx.Observable[_ <: R]] = (t: T) => f(t).asJavaObservable
+ toScalaObservable[R](o.asJavaObservable.concatMapDelayError[R](jf))
+ }
+}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
index e3756f2d..83133dc8 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
@@ -62,6 +62,9 @@ class ObservableCompletenessKit extends CompletenessKit {
"collect(Func0[R], Action2[R, _ >: T])" -> "[TODO: See /~https://github.com/ReactiveX/RxScala/issues/63]",
"compose(Transformer[_ >: T, _ <: R])" -> "[use extension methods instead]",
"concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "concatMapEager(Int, T => Observable[R])",
+ "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int, Int)" -> "concatMapEager(Int, Int, T => Observable[R])",
+ "concatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]])" -> "[use `concatMap(t => f(t).toObservable)`]",
+ "concatMapDelayError(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "[use `delayError.concatMap(T => Observable[R])`]",
"concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
@@ -96,6 +99,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"limit(Int)" -> "take(Int)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapWith(T => Observable[U])((T, U) => R)",
"flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterableWith(T => Iterable[U])((T, U) => R)",
+ "flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]], Int)" -> "[use `flatMap(int, t => collectionSelector(t).toObservable)`]",
+ "flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "[use `flatMapWith(Int, t => collectionSelector(t).toObservable)(resultSelector)`]",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int, T => Observable[U])((T, U) => R)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
@@ -103,6 +108,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
+ "onBackpressureBuffer(Long, Action0, Strategy)" -> "[TODO]",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
@@ -142,6 +148,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
"subscribe()" -> "subscribe()",
"switchIfEmpty(Observable[_ <: T])" -> "switchIfEmpty(Observable[U])",
+ "switchMapDelayError(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "[use `delayError.switchMap(T => Observable[R])`]",
"takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]",
"takeLast(Int)" -> "takeRight(Int)",
"takeLast(Long, TimeUnit)" -> "takeRight(Duration)",
@@ -188,10 +195,15 @@ class ObservableCompletenessKit extends CompletenessKit {
// manually added entries for Java static methods
"amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)",
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
- "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
- "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)",
- "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "[use `combineLatest(iter.toSeq)(Seq[T] => R)`]",
+ "create(SyncOnSubscribe[S, T])" -> "[TODO]",
+ "create(AsyncOnSubscribe[S, T])" -> "[TODO]",
+ "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatestWith(Observable[U])((T, U) => R)",
+ "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
+ "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
+ "combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
+ "concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]",
+ "concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatEager`]",
@@ -211,16 +223,17 @@ class ObservableCompletenessKit extends CompletenessKit {
"merge(Iterable[_ <: Observable[_ <: T]])" -> "[use `Observable.from(iter).flatten`]",
"merge(Array[Observable[_ <: T]], Int)" -> "[use `Observable.from(array).flatten(n)`]",
"merge(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `Observable.from(iter).flatten(n)`]",
- "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
- "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
- "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])",
- "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.flattenDelayError`]",
- "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.flattenDelayError(Int)`]",
+ "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "[use `delayError.merge(Observable[U])`]",
+ "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.flatten(<:<[Observable[T], Observable[Observable[U]]])`]",
+ "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "[use `delayError.flatten(Int)(<:<[Observable[T], Observable[Observable[U]]])`]",
+ "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.flatten`]",
+ "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.delayError.flatten(Int)`]",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)",
"range(Int, Int)" -> commentForRange,
"range(Int, Int, Scheduler)" -> "[use `(start until end).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]",
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
+ "switchOnNextDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.switch(<:<[Observable[T], Observable[Observable[U]]])`]",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)",