Skip to content

Commit

Permalink
Merge pull request #52 from zsxwing/rxjava-1.0.0-RC9
Browse files Browse the repository at this point in the history
Update to RxJava 1.0.0-RC9
  • Loading branch information
zsxwing committed Nov 3, 2014
2 parents 4ec8f40 + 6c190b2 commit 19c9aa1
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 101 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.0-rc.8",
"io.reactivex" % "rxjava" % "1.0.0-rc.9",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
29 changes: 0 additions & 29 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,27 +441,12 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def exampleWithPublish2() {
val unshared = Observable.from(1 to 4)
val shared = unshared.publish(0)
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
shared.connect
}

@Test def exampleWithPublish3() {
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2))
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
Thread.sleep(1000)
}

@Test def exampleWithPublish4() {
val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L)
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
Thread.sleep(1000)
}

def doLater(waitTime: Duration, action: () => Unit): Unit = {
Observable.interval(waitTime).take(1).subscribe(_ => action())
}
Expand Down Expand Up @@ -1342,20 +1327,6 @@ class RxScalaDemo extends JUnitSuite {
hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n"))
}

@Test def publishLastExample() {
val hot = createAHotObservable
val o = hot.publishLast
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
o.connect
}

@Test def publishLastExample2() {
val hot = createAHotObservable
val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice
o.subscribe(n => println(s"subscriber gets $n"))
}

@Test def unsubscribeOnExample() {
val o = Observable[String] {
subscriber =>
Expand Down
69 changes: 0 additions & 69 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1638,20 +1638,6 @@ trait Observable[+T]
new ConnectableObservable[T](asJavaObservable.publish())
}


/**
* Returns a ConnectableObservable that emits `initialValue` followed by the items emitted by `this` Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishConnect.i.png">
*
* @param initialValue the initial value to be emitted by the resulting ConnectableObservable
* @return a `ConnectableObservable` that shares a single subscription to the underlying Observable and starts with `initialValue`
*/
def publish[T](initialValue: T): ConnectableObservable[T] = {
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
new ConnectableObservable[T](thisJava.publish(initialValue))
}

/**
* Returns an Observable that emits the results of invoking a specified selector on items emitted by a `ConnectableObservable`
* that shares a single subscription to the underlying sequence.
Expand All @@ -1671,61 +1657,6 @@ trait Observable[+T]
toScalaObservable[R](thisJava.publish(fJava))
}

/**
* Returns an Observable that emits `initialValue` followed by the results of invoking a specified
* selector on items emitted by a `ConnectableObservable` that shares a single subscription to the
* source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishConnect.if.png">
*
* @param selector a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source Observable. Subscribers to the source will
* receive all notifications of the source from the time of the subscription forward
* @param initialValue the initial value of the underlying `BehaviorSubject`
* @return an Observable that emits `initialValue` followed by the results of invoking the selector
* on a `ConnectableObservable` that shares a single subscription to the underlying Observable
*/
def publish[R](selector: Observable[T] => Observable[R], initialValue: T @uncheckedVariance): Observable[R] = {
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
(jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
toScalaObservable[R](thisJava.publish(fJava, initialValue))
}

/**
* Returns a [[ConnectableObservable]] that emits only the last item emitted by the source Observable.
* A [[ConnectableObservable]] resembles an ordinary Observable, except that it does not begin emitting items
* when it is subscribed to, but only when its `connect` method is called.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishLast.png">
*
* @return a [[ConnectableObservable]] that emits only the last item emitted by the source Observable
*/
def publishLast: ConnectableObservable[T] = {
new ConnectableObservable[T](asJavaObservable.publishLast())
}

/**
* Returns an Observable that emits an item that results from invoking a specified selector on the last item
* emitted by a [[ConnectableObservable]] that shares a single subscription to the source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishLast.f.png">
*
* @param selector a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source Observable. Subscribers to the source will only
* receive the last item emitted by the source.
* @return an Observable that emits an item that is the result of invoking the selector on a [[ConnectableObservable]]
* that shares a single subscription to the source Observable
*/
def publishLast[R](selector: Observable[T] => Observable[R]): Observable[R] = {
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
val fJava = new rx.functions.Func1[rx.Observable[T], rx.Observable[R]]() {
override def call(jo: rx.Observable[T]): rx.Observable[R] =
selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
}
toScalaObservable[R](thisJava.publishLast(fJava))
}

// TODO add Scala-like aggregate function

/**
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/rx/lang/scala/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ trait Subject[T] extends Observable[T] with Observer[T] {
override def onNext(value: T): Unit = { asJavaObserver.onNext(value)}
override def onError(error: Throwable): Unit = { asJavaObserver.onError(error) }
override def onCompleted() { asJavaObserver.onCompleted() }

/**
* Indicates whether the [[Subject]] has [[Observer]]s subscribed to it.
* @return `true` if there is at least one [[Observer]] subscribed to this [[Subject]], `false` otherwise
*/
def hasObservers: Boolean = asJavaSubject.hasObservers()
}

/**
Expand Down
2 changes: 0 additions & 2 deletions src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ class CompletenessTest extends JUnitSuite {
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[T] => Observable[R])",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[T] => Observable[R], T @uncheckedVariance)",
"publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]])" -> "repeatWhen(Observable[Unit] => Observable[Any])",
Expand Down

0 comments on commit 19c9aa1

Please sign in to comment.