Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SyncOnSubscribe and AsyncOnSubscribe #220

Merged
merged 6 commits into from
Dec 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 79 additions & 66 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import scala.concurrent.duration.DurationLong
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.language.implicitConversions

import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.assertFalse
import org.junit.Ignore
import org.junit.Test
import org.scalatest.junit.JUnitSuite

import rx.lang.scala._
import rx.lang.scala.observables.{AsyncOnSubscribe, SyncOnSubscribe}
import rx.lang.scala.schedulers._

/**
Expand Down Expand Up @@ -742,44 +741,68 @@ class RxScalaDemo extends JUnitSuite {
}

/**
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
* calculated.
* Using AsyncOnSubscribe of SyncOnSubscribe ensures that your Observable
* does not calculate elements after the consumer unsubscribes and correctly does not overload
* the subscriber with data (responds to backpressure).
*
* Here we create a SyncOnSubscribe without state, this means that it performs the same action every time downstream can handle more data.
*/
@Test def createExample(): Unit = {
val o = Observable.create(SyncOnSubscribe.stateless(
next = () => Notification.OnNext(math.random),
onUnsubscribe = () => println("I have stopped generating random numbers for this subscriber")
))
o.take(10).foreach(r => println(s"Next random number: $r"))
}

/**
* You can also add state to (A)SyncOnSubscribe, you generate a state on each subscription and can alter that state in each next call
* Here we use it to count to a specific number
*/
@Test def createExampleGood() {
val o = Observable[String](subscriber => {
var i = 0
while (i < 2 && !subscriber.isUnsubscribed) {
subscriber.onNext(calculateElement(i))
i += 1
}
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
})
@Test def createExampleWithState() {
// Starts with state `0`
val o = Observable.create(SyncOnSubscribe(() => 0)(i => {
if(i < 2)
// Check if the state has reached 2 yet, if not, we emit the current state and add 1 to the state
(Notification.OnNext(i), i+1)
else
// Otherwise we signal completion
(Notification.OnCompleted, i)
}))
o.take(1).subscribe(println(_))
}

@Test def createExampleGood2() {
/**
* This example shows how to read a (potentially blocking) data source step-by-step (line by line) using SyncOnSubscribe.
*/
@Test def createExampleFromInputStream() {
import scala.io.{Codec, Source}

val rxscalaURL = "http://reactivex.io/rxscala/"
val rxscala = Observable[String](subscriber => {
try {
// We use the `singleState` helper here, since we only want to generate one state per subscriber
// and do not need to modify it afterwards
val rxscala = Observable.create(SyncOnSubscribe.singleState(
// This is our `generator`, which generates a state
generator = () => {
val input = new java.net.URL(rxscalaURL).openStream()
subscriber.add(Subscription {
input.close()
})
val iter = Source.fromInputStream(input)(Codec.UTF8).getLines()
while(iter.hasNext && !subscriber.isUnsubscribed) {
val line = iter.next()
subscriber.onNext(line)
}
if (!subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
}
catch {
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
(input, Source.fromInputStream(input)(Codec.UTF8).getLines())
})(
// This is our `next` function, which gets called whenever the subscriber can handle more data
next = {
case (_, lines) => {
if(lines.hasNext)
// Here we provide the next line
Notification.OnNext(lines.next())
else
// Here we signal that the stream has completed
Notification.OnCompleted
}
},
// This is our `onUnsubscribe` function, which gets called after the subscriber unsubscribes, usually to perform cleanup
onUnsubscribe = {
case (input, _) => scala.util.Try { input.close() }
}
}).subscribeOn(IOScheduler())
)).subscribeOn(IOScheduler())

val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
.map(_.toLowerCase)
Expand All @@ -788,45 +811,35 @@ class RxScalaDemo extends JUnitSuite {
println(s"RxScala appears ${count.toBlocking.single} times in ${rxscalaURL}")
}

@Test def createExampleWithBackpressure() {
val o = Observable {
subscriber: Subscriber[String] => {
var emitted = 0
subscriber.setProducer(n => {
val intN = if (n >= 10) 10 else n.toInt
var i = 0
while(i < intN && emitted < 10 && !subscriber.isUnsubscribed) {
emitted += 1
subscriber.onNext(s"item ${emitted}")
i += 1
}
if (emitted == 10 && !subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
})
}
}.subscribeOn(IOScheduler()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
override def onStart() {
println("Request a new one at the beginning")
request(1)
}

override def onNext(v: String) {
println("Received " + v)
println("Request a new one after receiving " + v)
request(1)
}
/** This example show how to generate an Observable using AsyncOnSubscribe, which can be more efficient than SyncOnSubscribe.
* Using AsyncOnSubscribe has the same advantages as SyncOnSubscribe, and furthermore it allows us to generate more than one
* result at a time (which can be more efficient) and allows us to generate the results asynchronously.
*/
@Test def createExampleAsyncOnUnsubscribe(): Unit = {
// We are going to count to this number
val countTo = 200L

override def onError(e: Throwable) {
e.printStackTrace()
val o = Observable.create(AsyncOnSubscribe(() => 0L)(
(count, demand) => {
// Stop counting if we're past the number we were going to count to
if(count > countTo)
(Notification.OnCompleted, count)
else {
// Generate an observable that contains [count,count+demand) and thus contains exactly `demand` items
val to = math.min(count + demand, countTo+1)
val range = count until to
val resultObservable = Observable.from(range)
println(s"Currently at $count, received a demand of $demand. Next range [$count,$to)")
(Notification.OnNext(resultObservable), to)
}
}

override def onCompleted() {
println("Done")
))
o.subscribe(new Subscriber[Long] {
override def onStart(): Unit = request(10)
override def onNext(i: Long): Unit = {
request(scala.util.Random.nextInt(10)+1)
}
})
Thread.sleep(10000)
}

def output(s: String): Unit = println(s)
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/rx/lang/scala/Notification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ sealed trait Notification[+T] {
}

def apply(observer: Observer[T]): Unit = accept(observer)

def map[U](f: T => U): Notification[U] = {
this match {
case Notification.OnNext(value) => Notification.OnNext(f(value))
case Notification.OnError(error) => Notification.OnError(error)
case Notification.OnCompleted => Notification.OnCompleted
}
}
}

/**
Expand Down
49 changes: 48 additions & 1 deletion src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package rx.lang.scala
import rx.annotations.{Beta, Experimental}
import rx.exceptions.OnErrorNotImplementedException
import rx.functions.FuncN
import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
import rx.lang.scala.observables.{AsyncOnSubscribe, ConnectableObservable, ErrorDelayingObservable, SyncOnSubscribe}

import scala.concurrent.duration
import java.util

Expand Down Expand Up @@ -4919,6 +4920,52 @@ object Observable {
}
*/

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.SyncOnSubscribe]]'s life cycle for
* generating events.
*
* Note: the `SyncOnSubscribe` provides a generic way to fulfill data by iterating
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser). If your
* data comes directly from an asynchronous/potentially concurrent source then consider using [[observables.AsyncOnSubscribe]].
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param syncOnSubscribe
* an implementation of `SyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `SyncOnSubscribe` to generate events
* @see [[observables.SyncOnSubscribe.stateful]]
* @see [[observables.SyncOnSubscribe.singleState]]
* @see [[observables.SyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](syncOnSubscribe: SyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(syncOnSubscribe))

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.AsyncOnSubscribe]]'s life cycle for
* generating events.
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param asyncOnSubscribe
* an implementation of `AsyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `AsyncOnSubscribe` to generate events
* @see [[observables.AsyncOnSubscribe.stateful]]
* @see [[observables.AsyncOnSubscribe.singleState]]
* @see [[observables.AsyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](asyncOnSubscribe: AsyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(asyncOnSubscribe))

/**
* Returns an Observable that will execute the specified function when someone subscribes to it.
*
Expand Down
78 changes: 78 additions & 0 deletions src/main/scala/rx/lang/scala/observables/AsyncOnSubscribe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rx.lang.scala.observables

import rx.annotations.Experimental
import rx.lang.scala.{Notification, Observable}

/**
* An utility class to create `Observable`s that start acting when subscribed to and responds
* correctly to back pressure requests from subscribers.
*
* Semantics:
* * `generator` is called to provide an initial state on each new subscription
* * `next` is called with the last state and a `requested` amount of items to provide a new state
* and an `Observable` that (potentially asynchronously) emits up to `requested` items.
* * `onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes
*/
object AsyncOnSubscribe {

/**
* Alias for [[AsyncOnSubscribe.stateful]]
* @see [[AsyncOnSubscribe.stateful]]
*/
@Experimental
def apply[S,T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)(next, onUnsubscribe)

/**
* Generates a stateful [[AsyncOnSubscribe]]
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the initial state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateful[S, T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = {
// The anonymous class shadows these names
val nextF = next
val onUnsubscribeF = onUnsubscribe

new rx.observables.AsyncOnSubscribe[S,T] {
import rx.lang.scala.JavaConversions._
override def generateState(): S = generator()
override def next(state: S, requested: Long, observer: rx.Observer[rx.Observable[_ <: T]]): S =
nextF(state, requested) match {
case (notification, nextState) =>
toJavaNotification(notification.map(toJavaObservable)).accept(observer)
nextState
}
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state)
}
}

/**
* Generates a [[AsyncOnSubscribe]] which does not generate a new state in `next`
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def singleState[S, T](generator: () => S)(next: (S, Long) => Notification[Observable[T]], onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)((s,r) => (next(s,r), s), onUnsubscribe)

/**
* Generates a stateless [[AsyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state
*
* @tparam T the type of the generated values
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateless[T](next: Long => Notification[Observable[T]], onUnsubscribe: () => Unit = () => ()) =
stateful[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe())

}
Loading