Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to using RxJava 1.0.11 #166

Merged
merged 23 commits into from
May 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7da715a
Update RxJava dependency to 1.0.11
jbripley May 24, 2015
bd960ce
Move all Experimental/Beta methods from ExperimentalAPIs.scala to Obs…
jbripley May 24, 2015
2ee04ab
Update completeness test with Experimental/Beta Observable methods move
jbripley May 24, 2015
0b72f74
Add new RxScala mapping to Experimental Subject methods added in RxJa…
jbripley May 24, 2015
c4ba2a3
Add new RxScala mapping to Experimental/Beta Observable methods added…
jbripley May 24, 2015
d08bd1f
Add manual completeness test mapping to Experimental/Beta Observable …
jbripley May 24, 2015
f23f5e3
Deprecated methods rather than removing them to maintain backward com…
zsxwing May 25, 2015
ddf7353
Use default parameter value to combine two variants of using into one
zsxwing May 25, 2015
8a7b8f2
Fix doc warnings
zsxwing May 23, 2015
01cf488
Add experimental and beta labels
zsxwing May 25, 2015
f97b22d
Fix Subject.getValues signature
zsxwing May 25, 2015
c752c32
Add experimental/beta labels for the rest methods
zsxwing May 25, 2015
3fdde5e
Merge pull request #1 from zsxwing/pr166
jbripley May 25, 2015
d617411
Move maxConcurrent parameter first so we can simplify name of flatMap…
jbripley May 25, 2015
78aee10
Update the deprecated messages
zsxwing May 26, 2015
391b796
Merge pull request #2 from zsxwing/pr166
jbripley May 26, 2015
99c3b07
Move maxConcurrent parameter first for all flatMap variants
jbripley May 26, 2015
ce36a17
Fix the deprecated messages for 'flatMap'
zsxwing May 27, 2015
b48ccc7
Merge pull request #3 from zsxwing/pr166
jbripley May 27, 2015
97a9829
Update the signature of flatMapWith and add an example
zsxwing May 29, 2015
0a8e0c5
Fix the Subject doc
zsxwing May 29, 2015
6d329f0
Remove redundant 'Int's
zsxwing May 29, 2015
faa17cf
Merge pull request #4 from zsxwing/pr166
jbripley May 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ scalaVersion in ThisBuild := "2.11.6"
crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.8",
"io.reactivex" % "rxjava" % "1.0.11",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.junit.Test

import rx.lang.scala._
import rx.lang.scala.schedulers._
import rx.lang.scala.ExperimentalAPIs._

@Ignore
object ExperimentalAPIExamples {
Expand Down Expand Up @@ -75,7 +74,7 @@ object ExperimentalAPIExamples {
@Test def onBackpressureBufferWithCapacityExample(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureBufferWithCapacity(200).observeOn(IOScheduler()).subscribe {
}).onBackpressureBuffer(200).observeOn(IOScheduler()).subscribe {
v =>
Thread.sleep(10) // A slow consumer
println(s"process $v")
Expand All @@ -85,7 +84,7 @@ object ExperimentalAPIExamples {
@Test def onBackpressureBufferWithCapacityExample2(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureBufferWithCapacity(10, println("Overflow")).observeOn(IOScheduler()).subscribe(
}).onBackpressureBuffer(10, println("Overflow")).observeOn(IOScheduler()).subscribe(
v => {
Thread.sleep(10)
// A slow consumer
Expand Down Expand Up @@ -129,25 +128,36 @@ object ExperimentalAPIExamples {
@Test def flatMapWithMaxConcurrentExample(): Unit = {
(1 to 1000000).toObservable
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMapWithMaxConcurrent((v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()), 10)
.flatMap(maxConcurrent = 10, v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()))
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def flatMapWithMaxConcurrentExample2(): Unit = {
(1 to 1000000).toObservable
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMapWithMaxConcurrent(
(v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
.flatMap(
maxConcurrent = 10,
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
e => Observable.just(-1).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
() => Observable.just(Int.MaxValue).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
10)
() => Observable.just(Int.MaxValue).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
)
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def flatMapWithMaxConcurrentExample3() {
(1 to 1000000).toObservable
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMapWith(
maxConcurrent = 10,
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
)(_ * _).subscribeOn(IOScheduler())
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def onBackpressureDropDoExample(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureDropDo {
}).onBackpressureDrop {
t => println(s"Dropping $t")
}.observeOn(IOScheduler()).subscribe {
v =>
Expand Down
16 changes: 15 additions & 1 deletion src/main/scala/rx/lang/scala/ExperimentalAPIs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import JavaConversions._
*
* `import rx.lang.scala.ExperimentalAPIs._` to enable them.
*/
@deprecated("Use new methods in [[Observable]] instead. This is kept here only for backward compatibility.", "0.25.0")
class ExperimentalObservable[+T](private val o: Observable[T]) {

/**
Expand All @@ -49,6 +50,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an [[Observable]] that will block the producer thread if the source emits items faster than its [[Observer]] can consume them
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@deprecated("Use [[[Observable.onBackpressureBlock(maxQueueLength:Int)*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def onBackpressureBlock(maxQueueLength: Int): Observable[T] = {
o.asJavaObservable.onBackpressureBlock(maxQueueLength)
}
Expand All @@ -70,6 +72,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an [[Observable]] that will block the producer thread if the source emits items faster than its [[Observer]] can consume them
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@deprecated("Use [[[Observable.onBackpressureBlock:*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def onBackpressureBlock: Observable[T] = {
o.asJavaObservable.onBackpressureBlock()
}
Expand All @@ -83,6 +86,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an [[Observable]] that will call `onRequest` when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@deprecated("Use [[Observable.doOnRequest]] instead. This is kept here only for backward compatibility.", "0.25.0")
def doOnRequest(onRequest: Long => Unit): Observable[T] = {
o.asJavaObservable.doOnRequest(new Action1[java.lang.Long] {
override def call(request: java.lang.Long): Unit = onRequest(request)
Expand All @@ -103,6 +107,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an [[Observable]] that will buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@deprecated("Use [[[Observable.onBackpressureBuffer(capacity:Long)*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def onBackpressureBufferWithCapacity(capacity: Long): Observable[T] = {
// Use `onBackpressureBufferWithCapacity` because if not, it will conflict with `Observable.onBackpressureBuffer`
o.asJavaObservable.onBackpressureBuffer(capacity)
Expand All @@ -124,6 +129,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@deprecated("Use [[[Observable.onBackpressureBuffer(capacity:Long,onOverflow:=>Unit)*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def onBackpressureBufferWithCapacity(capacity: Long, onOverflow: => Unit): Observable[T] = {
// Use `onBackpressureBufferWithCapacity` because if not, it will conflict with `Observable.onBackpressureBuffer`
o.asJavaObservable.onBackpressureBuffer(capacity, new Action0 {
Expand All @@ -141,6 +147,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an
* alternate [[Observable]] if the source [[Observable]] is empty.
*/
@deprecated("Use [[Observable.switchIfEmpty]] instead. This is kept here only for backward compatibility.", "0.25.0")
def switchIfEmpty[U >: T](alternate: Observable[U]): Observable[U] = {
val jo = o.asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](jo.switchIfEmpty(alternate.asJavaObservable))
Expand All @@ -161,6 +168,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* `resultSelector` function only when the source [[Observable]] sequence (this instance) emits an item
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@deprecated("Use [[Observable.withLatestFrom]] instead. This is kept here only for backward compatibility.", "0.25.0")
def withLatestFrom[U, R](other: Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
val func = new Func2[T, U, R] {
override def call(t1: T, t2: U): R = resultSelector(t1, t2)
Expand All @@ -185,6 +193,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
* @see [[Observable.takeWhile]]
*/
@deprecated("Use [[[Observable.takeUntil(stopPredicate:T=>Boolean)*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def takeUntil(stopPredicate: T => Boolean): Observable[T] = {
val func = new Func1[T, java.lang.Boolean] {
override def call(t: T): java.lang.Boolean = stopPredicate(t)
Expand All @@ -206,6 +215,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* by the source [[Observable]] and merging the results of the [[Observable]]s obtained from this transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@deprecated("Use [[[Observable.flatMap[R](maxConcurrent:Int,f:T=>rx\\.lang\\.scala\\.Observable[R])*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def flatMapWithMaxConcurrent[R](f: T => Observable[R], maxConcurrent: Int): Observable[R] = {
toScalaObservable[R](o.asJavaObservable.flatMap[R](new Func1[T, rx.Observable[_ <: R]] {
def call(t1: T): rx.Observable[_ <: R] = {
Expand All @@ -229,6 +239,7 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* specified functions to the emissions and notifications of the source [[Observable]]
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@deprecated("Use [[[Observable.flatMap[R](maxConcurrent:Int,onNext:T=>rx\\.lang\\.scala\\.Observable[R],onError:Throwable=>rx\\.lang\\.scala\\.Observable[R],onCompleted:()=>rx\\.lang\\.scala\\.Observable[R])*]]] instead. This is kept here only for backward compatibility.", "0.25.0")
def flatMapWithMaxConcurrent[R](onNext: T => Observable[R], onError: Throwable => Observable[R], onCompleted: () => Observable[R], maxConcurrent: Int): Observable[R] = {
val jOnNext = new Func1[T, rx.Observable[_ <: R]] {
override def call(t: T): rx.Observable[_ <: R] = onNext(t).asJavaObservable
Expand Down Expand Up @@ -257,13 +268,16 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
* @return an new [[Observable]] that will drop `onNext` notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@deprecated("Use [[Observable.onBackpressureDrop(onDrop:T=>Unit)*]] instead. This is kept here only for backward compatibility.", "0.25.0")
def onBackpressureDropDo(onDrop: T => Unit): Observable[T] = {
toScalaObservable[T](o.asJavaObservable.onBackpressureDrop(new Action1[T] {
override def call(t: T) = onDrop(t)
}))
}
}

@deprecated("Use new methods in [[Observable]] instead. This is kept here only for backward compatibility.", "0.25.0")
object ExperimentalAPIs {
implicit def toExperimentalObservable[T](o: Observable[T]): ExperimentalObservable[T] = new ExperimentalObservable(o)
@deprecated("Use new methods in [[Observable]] instead. This is kept here only for backward compatibility.", "0.25.0")
def toExperimentalObservable[T](o: Observable[T]): ExperimentalObservable[T] = new ExperimentalObservable(o)
}
Loading