Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to RxJava 1.1.5 #194

Merged
merged 3 commits into from
Jun 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.1",
"io.reactivex" % "rxjava" % "1.1.5",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
Expand Down
138 changes: 135 additions & 3 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,28 @@ class RxScalaDemo extends JUnitSuite {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
val thirdCounter = Observable.interval(850 millis)
val sources = Seq(firstCounter, secondCounter, thirdCounter)
val sources = Iterable(firstCounter, secondCounter, thirdCounter)
val combinedCounter = Observable.combineLatest(sources)(_.toList).take(10)

combinedCounter subscribe {x => println(s"Emitted group: $x")}
waitFor(combinedCounter)
}

@Test def combineLatestDelayErrorExample() {
val firstCounter = Observable.just(1) ++ Observable.error(new RuntimeException("Oops!"))
val secondCounter = Observable.interval(550 millis).take(3)
val sources = Iterable(firstCounter, secondCounter)

Observable.combineLatest(sources)(_.toList).subscribe(
i => println("combineLatest: " + i),
e => println("combineLatest: " + e.getMessage))
Thread.sleep(2000)
Observable.combineLatestDelayError(sources)(_.toList).subscribe(
i => println("combineLatestDelayError: " + i),
e => println("combineLatestDelayError: " + e.getMessage))
Thread.sleep(2000)
}

@Test def olympicsExampleWithoutPublish() {
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
medals.subscribe(println(_)) // triggers an execution of medals Observable
Expand Down Expand Up @@ -1489,6 +1504,20 @@ class RxScalaDemo extends JUnitSuite {
.toBlocking.foreach(println)
}

@Test def concatMapDelayErrorExample() {
println("=== concatMap ===")
(1 to 10).toObservable
.concatMap { i =>
if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
}.subscribe(println(_), _.printStackTrace)

println("=== delayError.concatMap ===")
(1 to 10).toObservable
.delayError.concatMap { i =>
if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
}.subscribe(println(_), _.printStackTrace)
}

@Test def onErrorResumeNextExample() {
val o = Observable {
(subscriber: Subscriber[Int]) =>
Expand All @@ -1512,13 +1541,78 @@ class RxScalaDemo extends JUnitSuite {
}.subscribe(println(_))
}

@Test def switchExample(): Unit = {
Observable.interval(300 millis).take(3).map { n =>
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
}.switch.take(5).subscribe(println(_))
}

@Test def switchDelayErrorExample(): Unit = {
println("=== switch ===")
Observable.interval(300 millis).take(3).map { n =>
if (n == 0) {
Observable.error(new RuntimeException("Oops!"))
} else {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
}
}.switch.subscribe(println(_), _.printStackTrace())

Thread.sleep(2000)

println("=== delayError.switch ===")
Observable.interval(300 millis).take(3).map { n =>
if (n == 0) {
Observable.error(new RuntimeException("Oops!"))
} else {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
}
}.delayError.switch.subscribe(println(_), _.printStackTrace())
}

@Test def switchMapExample() {
val o = Observable.interval(300 millis).take(5).switchMap[String] {
n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}")
}
o.toBlocking.foreach(println)
}

@Test def switchMapDelayErrorExample() {
println("=== switchMap ===")
Observable.interval(300 millis).take(3).switchMap { n =>
if (n == 0) {
Observable.error(new RuntimeException("Oops!"))
} else {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())

Thread.sleep(2000)

println("=== delayError.switchMap ===")
Observable.interval(300 millis).take(3).delayError.switchMap { n =>
if (n == 0) {
Observable.error(new RuntimeException("Oops!"))
} else {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())
}

@Test def joinExample() {
val o1 = Observable.interval(500 millis).map(n => "1: " + n)
val o2 = Observable.interval(100 millis).map(n => "2: " + n)
Expand Down Expand Up @@ -1591,6 +1685,7 @@ class RxScalaDemo extends JUnitSuite {
@Test def onBackpressureDropExample() {
val o = createFastObservable.onBackpressureDrop
val l = new CountDownLatch(1)
// Use a small buffer to demonstrate the drop behavior
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
Expand Down Expand Up @@ -1713,13 +1808,19 @@ class RxScalaDemo extends JUnitSuite {
}).subscribe(println(_))
}

@Test def concatMapEagerExample3(): Unit = {
(0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
}).subscribe(println(_))
}

@Test def flattenDelayErrorExample() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace())
Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace())
}

@Test def flattenDelayErrorExample2() {
Expand All @@ -1728,7 +1829,7 @@ class RxScalaDemo extends JUnitSuite {
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace())
Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace())
}

@Test def blockingObservableSubscribeExample(): Unit = {
Expand Down Expand Up @@ -1778,4 +1879,35 @@ class RxScalaDemo extends JUnitSuite {
println("3rd Observer is subscribing")
o.subscribe(i => println(s"s3: $i"))
}

def concatDelayErrorExample(): Unit = {
val o1 = Observable.error(new RuntimeException("Oops!"))
val o2 = Observable.just(1, 2, 3)
val os = Observable.just(o1, o2)
os.concat.subscribe(i => println("concat: " + i), e => println("concat: " + e.getMessage))
os.delayError.concat.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage))
}

def onTerminateDetachExample(): Unit = {
{
var o = new Object
val weakReference = new scala.ref.WeakReference[Object](o)
// s will have a reference to "o" without onTerminateDetach
val s = Observable.just(o).size.subscribe(println(_))
o = null
System.gc() // It's fine for a demo even if it's just a best effort
Thread.sleep(2000)
println(s"without onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
}
{
var o = new Object
val weakReference = new scala.ref.WeakReference[Object](o)
// s won't have a reference to "o" since they are detached.
val s = Observable.just(o).size.onTerminateDetach.subscribe(println(_))
o = null
System.gc() // It's fine for a demo even if it's just a best effort
Thread.sleep(2000)
println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
}
}
}
Loading