From fc832fdc4ab3e1b759147024ae4677374afc230b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 3 Nov 2014 13:33:31 +0800 Subject: [PATCH 1/2] Add hasObservers method to Subject --- build.sbt | 2 +- src/main/scala/rx/lang/scala/Subject.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 5fe93356..d8bf1f45 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2" crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2") libraryDependencies ++= Seq( - "io.reactivex" % "rxjava" % "1.0.0-rc.8", + "io.reactivex" % "rxjava" % "1.0.0-rc.9", "org.mockito" % "mockito-core" % "1.9.5" % "test", "junit" % "junit" % "4.11" % "test", "org.scalatest" %% "scalatest" % "2.2.2" % "test") diff --git a/src/main/scala/rx/lang/scala/Subject.scala b/src/main/scala/rx/lang/scala/Subject.scala index e5cb8a72..2550d757 100644 --- a/src/main/scala/rx/lang/scala/Subject.scala +++ b/src/main/scala/rx/lang/scala/Subject.scala @@ -27,6 +27,12 @@ trait Subject[T] extends Observable[T] with Observer[T] { override def onNext(value: T): Unit = { asJavaObserver.onNext(value)} override def onError(error: Throwable): Unit = { asJavaObserver.onError(error) } override def onCompleted() { asJavaObserver.onCompleted() } + + /** + * Indicates whether the [[Subject]] has [[Observer]]s subscribed to it. + * @return `true` if there is at least one [[Observer]] subscribed to this [[Subject]], `false` otherwise + */ + def hasObservers: Boolean = asJavaSubject.hasObservers() } /** From 6c190b20e60d5a0c0a189f1346f2fc5a60de8f75 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 3 Nov 2014 13:42:11 +0800 Subject: [PATCH 2/2] Remove PublishLast/Publish(initialValue) --- .../rx/lang/scala/examples/RxScalaDemo.scala | 29 -------- src/main/scala/rx/lang/scala/Observable.scala | 69 ------------------- .../rx/lang/scala/CompletenessTest.scala | 2 - 3 files changed, 100 deletions(-) diff --git a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala index 92ffef3e..ae91ec8c 100755 --- a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -441,27 +441,12 @@ class RxScalaDemo extends JUnitSuite { } @Test def exampleWithPublish2() { - val unshared = Observable.from(1 to 4) - val shared = unshared.publish(0) - shared.subscribe(n => println(s"subscriber 1 gets $n")) - shared.subscribe(n => println(s"subscriber 2 gets $n")) - shared.connect - } - - @Test def exampleWithPublish3() { val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2)) o.subscribe(n => println(s"subscriber 1 gets $n")) o.subscribe(n => println(s"subscriber 2 gets $n")) Thread.sleep(1000) } - @Test def exampleWithPublish4() { - val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L) - o.subscribe(n => println(s"subscriber 1 gets $n")) - o.subscribe(n => println(s"subscriber 2 gets $n")) - Thread.sleep(1000) - } - def doLater(waitTime: Duration, action: () => Unit): Unit = { Observable.interval(waitTime).take(1).subscribe(_ => action()) } @@ -1342,20 +1327,6 @@ class RxScalaDemo extends JUnitSuite { hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n")) } - @Test def publishLastExample() { - val hot = createAHotObservable - val o = hot.publishLast - o.subscribe(n => println(s"subscriber 1 gets $n")) - o.subscribe(n => println(s"subscriber 2 gets $n")) - o.connect - } - - @Test def publishLastExample2() { - val hot = createAHotObservable - val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice - o.subscribe(n => println(s"subscriber gets $n")) - } - @Test def unsubscribeOnExample() { val o = Observable[String] { subscriber => diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index a7886032..bae4636e 100755 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -1638,20 +1638,6 @@ trait Observable[+T] new ConnectableObservable[T](asJavaObservable.publish()) } - - /** - * Returns a ConnectableObservable that emits `initialValue` followed by the items emitted by `this` Observable. - *

- * - * - * @param initialValue the initial value to be emitted by the resulting ConnectableObservable - * @return a `ConnectableObservable` that shares a single subscription to the underlying Observable and starts with `initialValue` - */ - def publish[T](initialValue: T): ConnectableObservable[T] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - new ConnectableObservable[T](thisJava.publish(initialValue)) - } - /** * Returns an Observable that emits the results of invoking a specified selector on items emitted by a `ConnectableObservable` * that shares a single subscription to the underlying sequence. @@ -1671,61 +1657,6 @@ trait Observable[+T] toScalaObservable[R](thisJava.publish(fJava)) } - /** - * Returns an Observable that emits `initialValue` followed by the results of invoking a specified - * selector on items emitted by a `ConnectableObservable` that shares a single subscription to the - * source Observable. - *

- * - * - * @param selector a function that can use the multicasted source sequence as many times as needed, without - * causing multiple subscriptions to the source Observable. Subscribers to the source will - * receive all notifications of the source from the time of the subscription forward - * @param initialValue the initial value of the underlying `BehaviorSubject` - * @return an Observable that emits `initialValue` followed by the results of invoking the selector - * on a `ConnectableObservable` that shares a single subscription to the underlying Observable - */ - def publish[R](selector: Observable[T] => Observable[R], initialValue: T @uncheckedVariance): Observable[R] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - toScalaObservable[R](thisJava.publish(fJava, initialValue)) - } - - /** - * Returns a [[ConnectableObservable]] that emits only the last item emitted by the source Observable. - * A [[ConnectableObservable]] resembles an ordinary Observable, except that it does not begin emitting items - * when it is subscribed to, but only when its `connect` method is called. - *

- * - * - * @return a [[ConnectableObservable]] that emits only the last item emitted by the source Observable - */ - def publishLast: ConnectableObservable[T] = { - new ConnectableObservable[T](asJavaObservable.publishLast()) - } - - /** - * Returns an Observable that emits an item that results from invoking a specified selector on the last item - * emitted by a [[ConnectableObservable]] that shares a single subscription to the source Observable. - *

- * - * - * @param selector a function that can use the multicasted source sequence as many times as needed, without - * causing multiple subscriptions to the source Observable. Subscribers to the source will only - * receive the last item emitted by the source. - * @return an Observable that emits an item that is the result of invoking the selector on a [[ConnectableObservable]] - * that shares a single subscription to the source Observable - */ - def publishLast[R](selector: Observable[T] => Observable[R]): Observable[R] = { - val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] - val fJava = new rx.functions.Func1[rx.Observable[T], rx.Observable[R]]() { - override def call(jo: rx.Observable[T]): rx.Observable[R] = - selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - } - toScalaObservable[R](thisJava.publishLast(fJava)) - } - // TODO add Scala-like aggregate function /** diff --git a/src/test/scala/rx/lang/scala/CompletenessTest.scala b/src/test/scala/rx/lang/scala/CompletenessTest.scala index 5aa251c9..1fdc1743 100644 --- a/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -121,8 +121,6 @@ class CompletenessTest extends JUnitSuite { "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", "publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[T] => Observable[R])", - "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[T] => Observable[R], T @uncheckedVariance)", - "publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", "repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]])" -> "repeatWhen(Observable[Unit] => Observable[Any])",