Skip to content

Commit

Permalink
Merge pull request #220 from dhoepelman/synconsubscribe
Browse files Browse the repository at this point in the history
Implement SyncOnSubscribe and AsyncOnSubscribe
  • Loading branch information
zsxwing authored Dec 10, 2016
2 parents 9dc9ba3 + b515226 commit 30ec23a
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 70 deletions.
145 changes: 79 additions & 66 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import scala.concurrent.duration.DurationLong
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.language.implicitConversions

import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.assertFalse
import org.junit.Ignore
import org.junit.Test
import org.scalatest.junit.JUnitSuite

import rx.lang.scala._
import rx.lang.scala.observables.{AsyncOnSubscribe, SyncOnSubscribe}
import rx.lang.scala.schedulers._

/**
Expand Down Expand Up @@ -742,44 +741,68 @@ class RxScalaDemo extends JUnitSuite {
}

/**
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
* calculated.
* Using AsyncOnSubscribe of SyncOnSubscribe ensures that your Observable
* does not calculate elements after the consumer unsubscribes and correctly does not overload
* the subscriber with data (responds to backpressure).
*
* Here we create a SyncOnSubscribe without state, this means that it performs the same action every time downstream can handle more data.
*/
@Test def createExample(): Unit = {
val o = Observable.create(SyncOnSubscribe.stateless(
next = () => Notification.OnNext(math.random),
onUnsubscribe = () => println("I have stopped generating random numbers for this subscriber")
))
o.take(10).foreach(r => println(s"Next random number: $r"))
}

/**
* You can also add state to (A)SyncOnSubscribe, you generate a state on each subscription and can alter that state in each next call
* Here we use it to count to a specific number
*/
@Test def createExampleGood() {
val o = Observable[String](subscriber => {
var i = 0
while (i < 2 && !subscriber.isUnsubscribed) {
subscriber.onNext(calculateElement(i))
i += 1
}
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
})
@Test def createExampleWithState() {
// Starts with state `0`
val o = Observable.create(SyncOnSubscribe(() => 0)(i => {
if(i < 2)
// Check if the state has reached 2 yet, if not, we emit the current state and add 1 to the state
(Notification.OnNext(i), i+1)
else
// Otherwise we signal completion
(Notification.OnCompleted, i)
}))
o.take(1).subscribe(println(_))
}

@Test def createExampleGood2() {
/**
* This example shows how to read a (potentially blocking) data source step-by-step (line by line) using SyncOnSubscribe.
*/
@Test def createExampleFromInputStream() {
import scala.io.{Codec, Source}

val rxscalaURL = "http://reactivex.io/rxscala/"
val rxscala = Observable[String](subscriber => {
try {
// We use the `singleState` helper here, since we only want to generate one state per subscriber
// and do not need to modify it afterwards
val rxscala = Observable.create(SyncOnSubscribe.singleState(
// This is our `generator`, which generates a state
generator = () => {
val input = new java.net.URL(rxscalaURL).openStream()
subscriber.add(Subscription {
input.close()
})
val iter = Source.fromInputStream(input)(Codec.UTF8).getLines()
while(iter.hasNext && !subscriber.isUnsubscribed) {
val line = iter.next()
subscriber.onNext(line)
}
if (!subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
}
catch {
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
(input, Source.fromInputStream(input)(Codec.UTF8).getLines())
})(
// This is our `next` function, which gets called whenever the subscriber can handle more data
next = {
case (_, lines) => {
if(lines.hasNext)
// Here we provide the next line
Notification.OnNext(lines.next())
else
// Here we signal that the stream has completed
Notification.OnCompleted
}
},
// This is our `onUnsubscribe` function, which gets called after the subscriber unsubscribes, usually to perform cleanup
onUnsubscribe = {
case (input, _) => scala.util.Try { input.close() }
}
}).subscribeOn(IOScheduler())
)).subscribeOn(IOScheduler())

val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
.map(_.toLowerCase)
Expand All @@ -788,45 +811,35 @@ class RxScalaDemo extends JUnitSuite {
println(s"RxScala appears ${count.toBlocking.single} times in ${rxscalaURL}")
}

@Test def createExampleWithBackpressure() {
val o = Observable {
subscriber: Subscriber[String] => {
var emitted = 0
subscriber.setProducer(n => {
val intN = if (n >= 10) 10 else n.toInt
var i = 0
while(i < intN && emitted < 10 && !subscriber.isUnsubscribed) {
emitted += 1
subscriber.onNext(s"item ${emitted}")
i += 1
}
if (emitted == 10 && !subscriber.isUnsubscribed) {
subscriber.onCompleted()
}
})
}
}.subscribeOn(IOScheduler()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
override def onStart() {
println("Request a new one at the beginning")
request(1)
}

override def onNext(v: String) {
println("Received " + v)
println("Request a new one after receiving " + v)
request(1)
}
/** This example show how to generate an Observable using AsyncOnSubscribe, which can be more efficient than SyncOnSubscribe.
* Using AsyncOnSubscribe has the same advantages as SyncOnSubscribe, and furthermore it allows us to generate more than one
* result at a time (which can be more efficient) and allows us to generate the results asynchronously.
*/
@Test def createExampleAsyncOnUnsubscribe(): Unit = {
// We are going to count to this number
val countTo = 200L

override def onError(e: Throwable) {
e.printStackTrace()
val o = Observable.create(AsyncOnSubscribe(() => 0L)(
(count, demand) => {
// Stop counting if we're past the number we were going to count to
if(count > countTo)
(Notification.OnCompleted, count)
else {
// Generate an observable that contains [count,count+demand) and thus contains exactly `demand` items
val to = math.min(count + demand, countTo+1)
val range = count until to
val resultObservable = Observable.from(range)
println(s"Currently at $count, received a demand of $demand. Next range [$count,$to)")
(Notification.OnNext(resultObservable), to)
}
}

override def onCompleted() {
println("Done")
))
o.subscribe(new Subscriber[Long] {
override def onStart(): Unit = request(10)
override def onNext(i: Long): Unit = {
request(scala.util.Random.nextInt(10)+1)
}
})
Thread.sleep(10000)
}

def output(s: String): Unit = println(s)
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/rx/lang/scala/Notification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ sealed trait Notification[+T] {
}

def apply(observer: Observer[T]): Unit = accept(observer)

def map[U](f: T => U): Notification[U] = {
this match {
case Notification.OnNext(value) => Notification.OnNext(f(value))
case Notification.OnError(error) => Notification.OnError(error)
case Notification.OnCompleted => Notification.OnCompleted
}
}
}

/**
Expand Down
49 changes: 48 additions & 1 deletion src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package rx.lang.scala
import rx.annotations.{Beta, Experimental}
import rx.exceptions.OnErrorNotImplementedException
import rx.functions.FuncN
import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
import rx.lang.scala.observables.{AsyncOnSubscribe, ConnectableObservable, ErrorDelayingObservable, SyncOnSubscribe}

import scala.concurrent.duration
import java.util

Expand Down Expand Up @@ -4919,6 +4920,52 @@ object Observable {
}
*/

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.SyncOnSubscribe]]'s life cycle for
* generating events.
*
* Note: the `SyncOnSubscribe` provides a generic way to fulfill data by iterating
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser). If your
* data comes directly from an asynchronous/potentially concurrent source then consider using [[observables.AsyncOnSubscribe]].
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param syncOnSubscribe
* an implementation of `SyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `SyncOnSubscribe` to generate events
* @see [[observables.SyncOnSubscribe.stateful]]
* @see [[observables.SyncOnSubscribe.singleState]]
* @see [[observables.SyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](syncOnSubscribe: SyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(syncOnSubscribe))

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given [[observables.AsyncOnSubscribe]]'s life cycle for
* generating events.
*
* $supportBackpressure
*
* $noDefaultScheduler
*
* @tparam T the type of the items that this Observable emits
* @tparam S the state type
* @param asyncOnSubscribe
* an implementation of `AsyncOnSubscribe`. There are many creation methods on the object for convenience.
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `AsyncOnSubscribe` to generate events
* @see [[observables.AsyncOnSubscribe.stateful]]
* @see [[observables.AsyncOnSubscribe.singleState]]
* @see [[observables.AsyncOnSubscribe.stateless]]
*/
@Experimental
def create[S,T](asyncOnSubscribe: AsyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(asyncOnSubscribe))

/**
* Returns an Observable that will execute the specified function when someone subscribes to it.
*
Expand Down
78 changes: 78 additions & 0 deletions src/main/scala/rx/lang/scala/observables/AsyncOnSubscribe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rx.lang.scala.observables

import rx.annotations.Experimental
import rx.lang.scala.{Notification, Observable}

/**
* An utility class to create `Observable`s that start acting when subscribed to and responds
* correctly to back pressure requests from subscribers.
*
* Semantics:
* * `generator` is called to provide an initial state on each new subscription
* * `next` is called with the last state and a `requested` amount of items to provide a new state
* and an `Observable` that (potentially asynchronously) emits up to `requested` items.
* * `onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes
*/
object AsyncOnSubscribe {

/**
* Alias for [[AsyncOnSubscribe.stateful]]
* @see [[AsyncOnSubscribe.stateful]]
*/
@Experimental
def apply[S,T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)(next, onUnsubscribe)

/**
* Generates a stateful [[AsyncOnSubscribe]]
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the initial state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateful[S, T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = {
// The anonymous class shadows these names
val nextF = next
val onUnsubscribeF = onUnsubscribe

new rx.observables.AsyncOnSubscribe[S,T] {
import rx.lang.scala.JavaConversions._
override def generateState(): S = generator()
override def next(state: S, requested: Long, observer: rx.Observer[rx.Observable[_ <: T]]): S =
nextF(state, requested) match {
case (notification, nextState) =>
toJavaNotification(notification.map(toJavaObservable)).accept(observer)
nextState
}
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state)
}
}

/**
* Generates a [[AsyncOnSubscribe]] which does not generate a new state in `next`
*
* @tparam T the type of the generated values
* @tparam S the type of the associated state with each Subscriber
* @param generator generates the state value
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def singleState[S, T](generator: () => S)(next: (S, Long) => Notification[Observable[T]], onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
stateful[S, T](generator)((s,r) => (next(s,r), s), onUnsubscribe)

/**
* Generates a stateless [[AsyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state
*
* @tparam T the type of the generated values
* @param next produces observables which contain data for the stream
* @param onUnsubscribe clean up behavior
*/
@Experimental
def stateless[T](next: Long => Notification[Observable[T]], onUnsubscribe: () => Unit = () => ()) =
stateful[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe())

}
Loading

0 comments on commit 30ec23a

Please sign in to comment.