-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement SyncOnSubscribe and AsyncOnSubscribe #220
Changes from 4 commits
e210dc7
5b20d2e
6c7b0a0
21a580d
1508f3e
b515226
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,16 @@ sealed trait Notification[+T] { | |
} | ||
|
||
def apply(observer: Observer[T]): Unit = accept(observer) | ||
|
||
// TODO: Should this be public or private to rx.lang.scala? | ||
def map[U](f: T => U): Notification[U] = { | ||
this match { | ||
case Notification.OnNext(value) => Notification.OnNext(f(value)) | ||
// TODO: Or do we want to cast here? Notification.OnError[T] is not a Notification[U] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No cast, because it would be a cast that cannot be checked by the JVM, which might create a compile time warning, which would have to be suppressed etc, to complicated... (Unless someone presents benchmarks suggesting that a cast performs better). |
||
case Notification.OnError(error) => Notification.OnError(error) | ||
case Notification.OnCompleted => Notification.OnCompleted | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package rx.lang.scala.observables | ||
|
||
import rx.annotations.Experimental | ||
import rx.lang.scala.{Notification, Observable} | ||
|
||
/** | ||
* An utility class to create `Observable`s that start acting when subscribed to and responds | ||
* correctly to back pressure requests from subscribers. | ||
* | ||
* Semantics: | ||
* * `generator` is called to provide an initial state on each new subscription | ||
* * `next` is called with the last state and a `requested` amount of items to provide a new state | ||
* and an `Observable` that (potentially asynchronously) emits up to `requested` items. | ||
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes | ||
*/ | ||
object AsyncOnSubscribe { | ||
|
||
/** | ||
* Alias for [[AsyncOnSubscribe.stateful]] | ||
* @see [[AsyncOnSubscribe.stateful]] | ||
*/ | ||
@Experimental | ||
def apply[S,T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = | ||
stateful[S, T](generator)(next, onUnsubscribe) | ||
|
||
/** | ||
* Generates a stateful [[AsyncOnSubscribe]] | ||
* | ||
* @tparam T the type of the generated values | ||
* @tparam S the type of the associated state with each Subscriber | ||
* @param generator generates the initial state value | ||
* @param next produces observables which contain data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def stateful[S, T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = { | ||
// The anonymous class shadows these names | ||
val nextF = next | ||
val onUnsubscribeF = onUnsubscribe | ||
|
||
new rx.observables.AsyncOnSubscribe[S,T] { | ||
import rx.lang.scala.JavaConversions._ | ||
override def generateState(): S = generator() | ||
override def next(state: S, requested: Long, observer: rx.Observer[rx.Observable[_ <: T]]): S = | ||
nextF(state, requested) match { | ||
case (notification, nextState) => | ||
toJavaNotification(notification.map(toJavaObservable)).accept(observer) | ||
nextState | ||
} | ||
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state) | ||
} | ||
} | ||
|
||
/** | ||
* Generates a [[AsyncOnSubscribe]] which does not generate a new state in `next` | ||
* | ||
* @tparam T the type of the generated values | ||
* @tparam S the type of the associated state with each Subscriber | ||
* @param generator generates the state value | ||
* @param next produces observables which contain data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def singleState[S, T](generator: () => S)(next: (S, Long) => Notification[Observable[T]], onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = | ||
stateful[S, T](generator)((s,r) => (next(s,r), s), onUnsubscribe) | ||
|
||
/** | ||
* Generates a stateless [[AsyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state | ||
* | ||
* @tparam T the type of the generated values | ||
* @param next produces observables which contain data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def stateless[T](next: Long => Notification[Observable[T]], onUnsubscribe: () => Unit = () => ()) = | ||
stateful[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe()) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package rx.lang.scala.observables | ||
|
||
import rx.annotations.Experimental | ||
import rx.lang.scala.{Notification, Observable} | ||
|
||
/** | ||
* An utility class to create `Observable`s that start acting when subscribed to and responds | ||
* correctly to back pressure requests from subscribers. | ||
* | ||
* Semantics: | ||
* * `generator` is called to provide an initial state on each new subscription | ||
* * `next` is called with the last state to provide a data item and a new state for the next `next` call | ||
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
(also in AsyncOnSubscribe) |
||
*/ | ||
object SyncOnSubscribe { | ||
|
||
/** | ||
* Alias for [[SyncOnSubscribe.stateful]] | ||
* @see [[SyncOnSubscribe.stateful]] | ||
*/ | ||
@Experimental | ||
def apply[S, T](generator: () => S)(next: S => (Notification[T], S), onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] = | ||
stateful[S, T](generator)(next, onUnsubscribe) | ||
|
||
/** | ||
* Generates a stateful [[SyncOnSubscribe]] | ||
* | ||
* @tparam T the type of the generated values | ||
* @tparam S the type of the associated state with each Subscriber | ||
* @param generator generates the initial state value | ||
* @param next produces data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def stateful[S, T](generator: () => S)(next: S => (Notification[T], S), onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] = { | ||
// The anonymous class shadows these names | ||
val nextF = next | ||
val onUnsubscribeF = onUnsubscribe | ||
|
||
new rx.observables.SyncOnSubscribe[S,T] { | ||
import rx.lang.scala.JavaConversions._ | ||
override def generateState(): S = generator() | ||
override def next(state: S, observer: rx.Observer[_ >: T]): S = | ||
nextF(state) match { | ||
case (notification, nextState) => | ||
toJavaNotification(notification).accept(observer) | ||
nextState | ||
} | ||
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state) | ||
} | ||
} | ||
|
||
/** | ||
* Generates a [[SyncOnSubscribe]] which does not generate a new state in `next` | ||
* | ||
* @tparam T the type of the generated values | ||
* @tparam S the type of the associated state with each Subscriber | ||
* @param generator generates the state value | ||
* @param next produces data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def singleState[S,T](generator: () => S)(next: S => Notification[T], onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] = | ||
apply[S, T](generator)(s => (next(s),s), onUnsubscribe) | ||
|
||
/** | ||
* Generates a stateless [[SyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state | ||
* | ||
* @tparam T the type of the generated values | ||
* @param next produces data for the stream | ||
* @param onUnsubscribe clean up behavior | ||
*/ | ||
@Experimental | ||
def stateless[T](next: () => Notification[T], onUnsubscribe: () => Unit = () => ()) = | ||
apply[Unit, T](() => ())(_ => (next(), ()), _ => onUnsubscribe()) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,4 +24,7 @@ package rx.lang.scala | |
* in Scala, because we use a pair `(key, observable)` instead of `GroupedObservable` | ||
* and a pair `(startFunction, observable)` instead of `ConnectableObservable`. | ||
*/ | ||
package object observables {} | ||
package object observables { | ||
type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T] | ||
type AsyncOnSubscribe[S, +T] = rx.observables.AsyncOnSubscribe[S, _ <: T] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the variance is correct here. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -198,8 +198,8 @@ class ObservableCompletenessKit extends CompletenessKit { | |
// manually added entries for Java static methods | ||
"amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)", | ||
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)", | ||
"create(SyncOnSubscribe[S, T])" -> "[TODO]", | ||
"create(AsyncOnSubscribe[S, T])" -> "[TODO]", | ||
"create(SyncOnSubscribe[S, T])" -> "create(SyncOnSubscribe[S, T])", | ||
"create(AsyncOnSubscribe[S, T])" -> "create(AsyncOnSubscribe[S, T])", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't you just delete these two lines? This list should only mention correspondences where Scala differs from Java, but here the signatures are the same. |
||
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatestWith(Observable[U])((T, U) => R)", | ||
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", | ||
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package rx.lang.scala.observables | ||
|
||
import org.junit.Test | ||
import org.junit.Assert._ | ||
import org.scalatest.junit.JUnitSuite | ||
import rx.lang.scala.observers.TestSubscriber | ||
import rx.lang.scala.{Notification, Observable} | ||
|
||
class AsyncOnSubscribeTests extends JUnitSuite { | ||
|
||
@Test | ||
def testStateful(): Unit = { | ||
val last = 2000L | ||
val sut = Observable.create(AsyncOnSubscribe(() => 0L)((count,demand) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this was a generally well known abbreviation for |
||
if(count > last) | ||
(Notification.OnCompleted, count) | ||
else { | ||
val max = math.max(count + demand, last) | ||
val next = Observable.from(count to max) | ||
(Notification.OnNext(next), max+1) | ||
} | ||
)) | ||
assertEquals((0L to last).toList, sut.toBlocking.toList) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good example, could you please also add something like this to |
||
|
||
@Test | ||
def testStateless(): Unit = { | ||
val sut = Observable.create(AsyncOnSubscribe.stateless(r => Notification.OnNext(Observable.just(42).repeat(r)))) | ||
assertEquals(List(42,42,42,42), sut.take(4).toBlocking.toList) | ||
} | ||
|
||
@Test | ||
def testSingleState(): Unit = { | ||
val random = math.random | ||
val sut = Observable.create(AsyncOnSubscribe.singleState(() => random)((s,r) => Notification.OnNext(Observable.just(random.toString).repeat(r)))) | ||
assertEquals(List(random.toString, random.toString), sut.take(2).toBlocking.toList) | ||
} | ||
|
||
@Test | ||
def testUnsubscribe(): Unit = { | ||
val sideEffect = new java.util.concurrent.atomic.AtomicBoolean(false) | ||
val sut = Observable.create(AsyncOnSubscribe(() => ())((s,r) => (Notification.OnCompleted, s), onUnsubscribe = s => sideEffect.set(true))) | ||
sut.foreach(_ => ()) | ||
assertEquals(true, sideEffect.get()) | ||
} | ||
|
||
@Test | ||
def testError(): Unit = { | ||
val e = new IllegalStateException("Oh noes") | ||
val sut = Observable.create(AsyncOnSubscribe(() => 0)((s,_) => (if(s>2) Notification.OnNext(Observable.just(s)) else Notification.OnError(e), s+1))) | ||
val testSubscriber = TestSubscriber[Int]() | ||
sut.subscribe(testSubscriber) | ||
testSubscriber.assertError(e) | ||
} | ||
|
||
@Test | ||
// Ensure that the generator is executed for each subscription | ||
def testGenerator(): Unit = { | ||
val sideEffectCount = new java.util.concurrent.atomic.AtomicInteger(0) | ||
val sut = Observable.create(AsyncOnSubscribe(() => sideEffectCount.incrementAndGet())((s, _) => (Notification.OnCompleted, s))) | ||
sut.toBlocking.toList | ||
sut.toBlocking.toList | ||
assertEquals(sideEffectCount.get(), 2) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package rx.lang.scala.observables | ||
|
||
import org.junit.Test | ||
import org.junit.Assert._ | ||
import org.scalatest.junit.JUnitSuite | ||
import rx.lang.scala.observers.TestSubscriber | ||
import rx.lang.scala.{Notification, Observable} | ||
|
||
class SyncOnSubscribeTests extends JUnitSuite { | ||
|
||
@Test | ||
def testStateful(): Unit = { | ||
val sut = Observable.create(SyncOnSubscribe(() => 0)(count => | ||
if(count > 3) | ||
(Notification.OnCompleted, count) | ||
else | ||
(Notification.OnNext(count), count+1) | ||
)) | ||
assertEquals(List(0,1,2,3), sut.toBlocking.toList) | ||
} | ||
|
||
@Test | ||
def testStateless(): Unit = { | ||
val sut = Observable.create(SyncOnSubscribe.stateless(() => Notification.OnNext(42))) | ||
assertEquals(List(42,42,42,42), sut.take(4).toBlocking.toList) | ||
} | ||
|
||
@Test | ||
def testSingleState(): Unit = { | ||
val random = math.random | ||
val sut = Observable.create(SyncOnSubscribe.singleState(() => random)(s => Notification.OnNext(s.toString))) | ||
assertEquals(List(random.toString, random.toString), sut.take(2).toBlocking.toList) | ||
} | ||
|
||
@Test | ||
def testUnsubscribe(): Unit = { | ||
val sideEffect = new java.util.concurrent.atomic.AtomicBoolean(false) | ||
val sut = Observable.create(SyncOnSubscribe(() => ())(s => (Notification.OnCompleted, s), onUnsubscribe = s => sideEffect.set(true))) | ||
sut.foreach(_ => ()) | ||
assertEquals(true, sideEffect.get()) | ||
} | ||
|
||
@Test | ||
def testError(): Unit = { | ||
val e = new IllegalStateException("Oh noes") | ||
val sut = Observable.create(SyncOnSubscribe(() => 0)(s => (if(s>2) Notification.OnNext(s) else Notification.OnError(e), s+1))) | ||
val testSubscriber = TestSubscriber[Int]() | ||
sut.subscribe(testSubscriber) | ||
testSubscriber.assertError(e) | ||
} | ||
|
||
@Test | ||
// Ensure that the generator is executed for each subscription | ||
def testGenerator(): Unit = { | ||
val sideEffectCount = new java.util.concurrent.atomic.AtomicInteger(0) | ||
val sut = Observable.create(SyncOnSubscribe(() => sideEffectCount.incrementAndGet())(s => (Notification.OnCompleted, s))) | ||
sut.toBlocking.toList | ||
sut.toBlocking.toList | ||
assertEquals(sideEffectCount.get(), 2) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's nice to have this public, and remove the TODO.