Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SyncOnSubscribe and AsyncOnSubscribe #220

Merged
merged 6 commits into from
Dec 10, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/scala/rx/lang/scala/Notification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ sealed trait Notification[+T] {
}

def apply(observer: Observer[T]): Unit = accept(observer)

// TODO: Should this be public or private to rx.lang.scala?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's nice to have this public, and remove the TODO.

def map[U](f: T => U): Notification[U] = {
this match {
case Notification.OnNext(value) => Notification.OnNext(f(value))
// TODO: Or do we want to cast here? Notification.OnError[T] is not a Notification[U]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No cast, because it would be a cast that cannot be checked by the JVM, which might create a compile time warning, which would have to be suppressed etc, to complicated... (Unless someone presents benchmarks suggesting that a cast performs better).

case Notification.OnError(error) => Notification.OnError(error)
case Notification.OnCompleted => Notification.OnCompleted
}
}
}

/**
Expand Down
49 changes: 48 additions & 1 deletion src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package rx.lang.scala
import rx.annotations.{Beta, Experimental}
import rx.exceptions.OnErrorNotImplementedException
import rx.functions.FuncN
import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
import rx.lang.scala.observables.{AsyncOnSubscribe, ConnectableObservable, ErrorDelayingObservable, SyncOnSubscribe}

import scala.concurrent.duration
import java.util

Expand Down Expand Up @@ -4919,6 +4920,52 @@ object Observable {
}
*/

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.SyncOnSubscribe]]'s life cycle for
* generating events.
*
* Note: the `SyncOnSubscribe` provides a generic way to fulfill data by iterating
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser). If your
* data comes directly from an asynchronous/potentially concurrent source then consider using [[observables.AsyncOnSubscribe]].
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param syncOnSubscribe
* an implementation of `SyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `SyncOnSubscribe` to generate events
* @see [[observables.SyncOnSubscribe.stateful]]
* @see [[observables.SyncOnSubscribe.singleState]]
* @see [[observables.SyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](syncOnSubscribe: SyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(syncOnSubscribe))

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.AsyncOnSubscribe]]'s life cycle for
* generating events.
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param asyncOnSubscribe
* an implementation of `AsyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `AsyncOnSubscribe` to generate events
* @see [[observables.AsyncOnSubscribe.stateful]]
* @see [[observables.AsyncOnSubscribe.singleState]]
* @see [[observables.AsyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](asyncOnSubscribe: AsyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(asyncOnSubscribe))

/**
* Returns an Observable that will execute the specified function when someone subscribes to it.
*
Expand Down
78 changes: 78 additions & 0 deletions src/main/scala/rx/lang/scala/observables/AsyncOnSubscribe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rx.lang.scala.observables

import rx.annotations.Experimental
import rx.lang.scala.{Notification, Observable}

/**
* An utility class to create `Observable`s that start acting when subscribed to and responds
* correctly to back pressure requests from subscribers.
*
* Semantics:
* * `generator` is called to provide an initial state on each new subscription
* * `next` is called with the last state and a `requested` amount of items to provide a new state
* and an `Observable` that (potentially asynchronously) emits up to `requested` items.
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes
*/
object AsyncOnSubscribe {

/**
* Alias for [[AsyncOnSubscribe.stateful]]
* @see [[AsyncOnSubscribe.stateful]]
*/
@Experimental
def apply[S,T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)(next, onUnsubscribe)

/**
* Generates a stateful [[AsyncOnSubscribe]]
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the initial state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateful[S, T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = {
// The anonymous class shadows these names
val nextF = next
val onUnsubscribeF = onUnsubscribe

new rx.observables.AsyncOnSubscribe[S,T] {
import rx.lang.scala.JavaConversions._
override def generateState(): S = generator()
override def next(state: S, requested: Long, observer: rx.Observer[rx.Observable[_ <: T]]): S =
nextF(state, requested) match {
case (notification, nextState) =>
toJavaNotification(notification.map(toJavaObservable)).accept(observer)
nextState
}
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state)
}
}

/**
* Generates a [[AsyncOnSubscribe]] which does not generate a new state in `next`
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def singleState[S, T](generator: () => S)(next: (S, Long) => Notification[Observable[T]], onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)((s,r) => (next(s,r), s), onUnsubscribe)

/**
* Generates a stateless [[AsyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state
*
* @tparam T the type of the generated values
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateless[T](next: Long => Notification[Observable[T]], onUnsubscribe: () => Unit = () => ()) =
stateful[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe())

}
77 changes: 77 additions & 0 deletions src/main/scala/rx/lang/scala/observables/SyncOnSubscribe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package rx.lang.scala.observables

import rx.annotations.Experimental
import rx.lang.scala.{Notification, Observable}

/**
* An utility class to create `Observable`s that start acting when subscribed to and responds
* correctly to back pressure requests from subscribers.
*
* Semantics:
* * `generator` is called to provide an initial state on each new subscription
* * `next` is called with the last state to provide a data item and a new state for the next `next` call
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes

(also in AsyncOnSubscribe)

*/
object SyncOnSubscribe {

/**
* Alias for [[SyncOnSubscribe.stateful]]
* @see [[SyncOnSubscribe.stateful]]
*/
@Experimental
def apply[S, T](generator: () => S)(next: S => (Notification[T], S), onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] =
stateful[S, T](generator)(next, onUnsubscribe)

/**
* Generates a stateful [[SyncOnSubscribe]]
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the initial state value
* @param next produces data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateful[S, T](generator: () => S)(next: S => (Notification[T], S), onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] = {
// The anonymous class shadows these names
val nextF = next
val onUnsubscribeF = onUnsubscribe

new rx.observables.SyncOnSubscribe[S,T] {
import rx.lang.scala.JavaConversions._
override def generateState(): S = generator()
override def next(state: S, observer: rx.Observer[_ >: T]): S =
nextF(state) match {
case (notification, nextState) =>
toJavaNotification(notification).accept(observer)
nextState
}
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state)
}
}

/**
* Generates a [[SyncOnSubscribe]] which does not generate a new state in `next`
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the state value
* @param next produces data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def singleState[S,T](generator: () => S)(next: S => Notification[T], onUnsubscribe: S => Unit = (_:S) => ()): SyncOnSubscribe[S,T] =
apply[S, T](generator)(s => (next(s),s), onUnsubscribe)

/**
* Generates a stateless [[SyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state
*
* @tparam T the type of the generated values
* @param next produces data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateless[T](next: () => Notification[T], onUnsubscribe: () => Unit = () => ()) =
apply[Unit, T](() => ())(_ => (next(), ()), _ => onUnsubscribe())

}
5 changes: 4 additions & 1 deletion src/main/scala/rx/lang/scala/observables/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ package rx.lang.scala
* in Scala, because we use a pair `(key, observable)` instead of `GroupedObservable`
* and a pair `(startFunction, observable)` instead of `ConnectableObservable`.
*/
package object observables {}
package object observables {
type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T]
type AsyncOnSubscribe[S, +T] = rx.observables.AsyncOnSubscribe[S, _ <: T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the variance is correct here.
An OnSubscribe[T] is a function Subscribe[T] => Unit, and the left side of the => is a negative position, but being the argument of Subscriber is also a negative position, and minus times minus = plus 😉

}
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ class ObservableCompletenessKit extends CompletenessKit {
// manually added entries for Java static methods
"amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)",
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"create(SyncOnSubscribe[S, T])" -> "[TODO]",
"create(AsyncOnSubscribe[S, T])" -> "[TODO]",
"create(SyncOnSubscribe[S, T])" -> "create(SyncOnSubscribe[S, T])",
"create(AsyncOnSubscribe[S, T])" -> "create(AsyncOnSubscribe[S, T])",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just delete these two lines? This list should only mention correspondences where Scala differs from Java, but here the signatures are the same.

"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatestWith(Observable[U])((T, U) => R)",
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rx.lang.scala.observables

import org.junit.Test
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import rx.lang.scala.observers.TestSubscriber
import rx.lang.scala.{Notification, Observable}

class AsyncOnSubscribeTests extends JUnitSuite {

@Test
def testStateful(): Unit = {
val last = 2000L
val sut = Observable.create(AsyncOnSubscribe(() => 0L)((count,demand) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sut? Could you please use a generally understandable name or abbreviation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was a generally well known abbreviation for subject under test in unit tests, but apparently that's only the case in my local bubble. Will change.

if(count > last)
(Notification.OnCompleted, count)
else {
val max = math.max(count + demand, last)
val next = Observable.from(count to max)
(Notification.OnNext(next), max+1)
}
))
assertEquals((0L to last).toList, sut.toBlocking.toList)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example, could you please also add something like this to RxScalaDemo? Also, adding (A)SyncOnSubscribe requires some information in RxScalaDemo to be updated: createExampleGood is not good any more, but only "medium good" 😉 It respects unsubscription, but not backpressure, and using (A)SyncOnSubscribe would be better. Could you please review all examples related to Observable.create, and add some comments, and examples for (A)SyncOnSubscribe, so that this is all up to date?


@Test
def testStateless(): Unit = {
val sut = Observable.create(AsyncOnSubscribe.stateless(r => Notification.OnNext(Observable.just(42).repeat(r))))
assertEquals(List(42,42,42,42), sut.take(4).toBlocking.toList)
}

@Test
def testSingleState(): Unit = {
val random = math.random
val sut = Observable.create(AsyncOnSubscribe.singleState(() => random)((s,r) => Notification.OnNext(Observable.just(random.toString).repeat(r))))
assertEquals(List(random.toString, random.toString), sut.take(2).toBlocking.toList)
}

@Test
def testUnsubscribe(): Unit = {
val sideEffect = new java.util.concurrent.atomic.AtomicBoolean(false)
val sut = Observable.create(AsyncOnSubscribe(() => ())((s,r) => (Notification.OnCompleted, s), onUnsubscribe = s => sideEffect.set(true)))
sut.foreach(_ => ())
assertEquals(true, sideEffect.get())
}

@Test
def testError(): Unit = {
val e = new IllegalStateException("Oh noes")
val sut = Observable.create(AsyncOnSubscribe(() => 0)((s,_) => (if(s>2) Notification.OnNext(Observable.just(s)) else Notification.OnError(e), s+1)))
val testSubscriber = TestSubscriber[Int]()
sut.subscribe(testSubscriber)
testSubscriber.assertError(e)
}

@Test
// Ensure that the generator is executed for each subscription
def testGenerator(): Unit = {
val sideEffectCount = new java.util.concurrent.atomic.AtomicInteger(0)
val sut = Observable.create(AsyncOnSubscribe(() => sideEffectCount.incrementAndGet())((s, _) => (Notification.OnCompleted, s)))
sut.toBlocking.toList
sut.toBlocking.toList
assertEquals(sideEffectCount.get(), 2)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package rx.lang.scala.observables

import org.junit.Test
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import rx.lang.scala.observers.TestSubscriber
import rx.lang.scala.{Notification, Observable}

class SyncOnSubscribeTests extends JUnitSuite {

@Test
def testStateful(): Unit = {
val sut = Observable.create(SyncOnSubscribe(() => 0)(count =>
if(count > 3)
(Notification.OnCompleted, count)
else
(Notification.OnNext(count), count+1)
))
assertEquals(List(0,1,2,3), sut.toBlocking.toList)
}

@Test
def testStateless(): Unit = {
val sut = Observable.create(SyncOnSubscribe.stateless(() => Notification.OnNext(42)))
assertEquals(List(42,42,42,42), sut.take(4).toBlocking.toList)
}

@Test
def testSingleState(): Unit = {
val random = math.random
val sut = Observable.create(SyncOnSubscribe.singleState(() => random)(s => Notification.OnNext(s.toString)))
assertEquals(List(random.toString, random.toString), sut.take(2).toBlocking.toList)
}

@Test
def testUnsubscribe(): Unit = {
val sideEffect = new java.util.concurrent.atomic.AtomicBoolean(false)
val sut = Observable.create(SyncOnSubscribe(() => ())(s => (Notification.OnCompleted, s), onUnsubscribe = s => sideEffect.set(true)))
sut.foreach(_ => ())
assertEquals(true, sideEffect.get())
}

@Test
def testError(): Unit = {
val e = new IllegalStateException("Oh noes")
val sut = Observable.create(SyncOnSubscribe(() => 0)(s => (if(s>2) Notification.OnNext(s) else Notification.OnError(e), s+1)))
val testSubscriber = TestSubscriber[Int]()
sut.subscribe(testSubscriber)
testSubscriber.assertError(e)
}

@Test
// Ensure that the generator is executed for each subscription
def testGenerator(): Unit = {
val sideEffectCount = new java.util.concurrent.atomic.AtomicInteger(0)
val sut = Observable.create(SyncOnSubscribe(() => sideEffectCount.incrementAndGet())(s => (Notification.OnCompleted, s)))
sut.toBlocking.toList
sut.toBlocking.toList
assertEquals(sideEffectCount.get(), 2)
}
}