Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SyncOnSubscribe and AsyncOnSubscribe #220

Merged
merged 6 commits into from
Dec 10, 2016

Conversation

dhoepelman
Copy link
Collaborator

@dhoepelman dhoepelman commented Nov 25, 2016

See #219 for some discussion.

I implemented both option 2 (as createX) and 3 (as applyX), as 2 has some major annoyances when used from scala: type interferencing can't determine the T and the user requires at least 2 statements because you need to call the observer (you can't use a single expression).
The effects of this can be seen next to each other in the tests.

However, this causes duplicate very-similar functionality. If we want to avoid this I suggest we drop the java style createX methods and rename the applyX methods to createX.

@samuelgruetter
Copy link
Collaborator

I didn't have time yet for a full review, but maybe let's first discuss some desirable properties of an (A)SyncOnSubscribe solution for RxScala. I have the following in mind:

  1. Should be compatible with future deprecation of Observable.create, see 1.x: Deprecate Observable.create() RxJava#4253
  2. Should be reasonably close to RxJava, so that documentation, blog posts, forums etc on RxJava are also applicable to RxScala
  3. The "right way" of constructing Observables (i.e. through (A)SyncOnSubscribe instead of home-made usage of Observable.create) should be easily discoverable by browsing the method list of Observable. In RxJava, this was achieved by adding unnecessary overloads of Observable.create, see Creating Observable#create overloads for SyncOnSubscribe and AsyncOnSubscribe RxJava#3738
  4. In Scala code, if I write MyObject(args) (which is desugared into MyObject.apply(args)), I should get back something of type MyObject.

This PR does well on 1) and 2), but not so well on 3) and 4). Regarding 4), I write AsyncOnSubscribe(...), but I get back an Observable, that's very confusing.

I don't have a solution for 3) and 4) yet, but will keep thinking about it... @dhoepelman @zsxwing and others, please let me know what you think, and if you have any ideas.

@dhoepelman
Copy link
Collaborator Author

4 is a good point, I'll fix that.

I would add a requirement 5: "It should feel natural so use from a Scala context".
SyncOnSubscribe.createStateful(() => state)((state, obs: Observer[_ <: Value]) => {obs.onNext(value); modify(state)})
fails that, while
SyncOnSubscribe.applyStateful(() => state)(state => (Notification.OnNext(value), state))
doesn't or less so. It is somewhat at odds with 2.

For 3 We could add the two (currently misnomered) (A)SyncOnSubscribe.apply methods to the Observable object as Observable.(a)syncOnSubscribe.

We then have to think about whether we still want to provide the createStateless/createSingleState utility methods either on the Observable object, an SyncOnSubscribe object or leave them out altogether*. I think it's better leaving them all out together over creating six new methods on the Observable object. Providing the methods on a separate object should be okay, but hurts discoverability.

* We could assume a Scala programmer is sufficiently familiar with similar state concepts through things like state monads, Observable#scan or Traversable#foldLeft to ad-hoc construct them

@samuelgruetter
Copy link
Collaborator

I agree with 5).

A note regarding wildcards in type parameters: All RxScala types have declaration site variance annotations (+ or -), e.g. Observer[-T] or Observable[+T]. So, when you write Observable[T], it always means Observable[_ <: T], so you don't need to write the wildcard. (This applies to your comment above as well as to some lines in the code of this PR).

Here's an idea for 3):

  • Add two methods to Observable: Observable.create(rx.lang.scala.observables.AsyncOnSubscribe) and Observable.create(rx.lang.scala.observables.SyncOnSubscribe)
  • Define rx.lang.scala.AsyncOnSubscribe[S, -T] as a type alias for rx.observables.AsyncOnSubscribe[S, T], and same for Sync, and say in their scaladoc that instances should be created with the constructors provided in the Scala (A)SyncOnSubscribe

Like this, discoverability should be good: Users browse for Observable.create, then ask "how can I get an (A)SyncOnSubscribe", go to (A)SyncOnSubscribe, and find the constructors.
Note that since users don't interact with (A)SyncOnSubscribe directly, we don't need to wrap the Java object in a Scala object, but can just use a type alias.
And moreover, constructing an Observable in Scala will look the same as in Java: Just something like, for instance, Observable.create(SyncOnSubscribe.createStateless(...)).

WDYT?

Copy link
Contributor

@rvanheest rvanheest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found some small things for you to consider.

*/
object AsyncOnSubscribe {
/**
* Generates an `Observable` that synchronously calls the provided `next` function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be 'asynchronously'? (also in other scaladoc sections in this object)

Copy link
Collaborator Author

@dhoepelman dhoepelman Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next is still synchronously called (see RxJava javadoc). The async in the name comes from next's resultant Observable[T], which can asynchronously deliver up to requested items.

I'll see if I can improve the docs to make this more clear.

* @see rx.observables.AsyncOnSubscribe.createStateful
*/
@Experimental
def createStateful[S, T](generator: () => S)(next: (S, Long, Observer[Observable[_ <: T]]) => S, onUnsubscribe: S => Unit = (_:S) => ()): Observable[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered a call-by-name parameter for generator? Would look better in using it, I think... (also in other functions in this object)

Copy link
Collaborator Author

@dhoepelman dhoepelman Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this. However the spec clearly states that generator will be called each time an observer subscribes. If we use call-by-name it will only be executed once very early into the program, this would be confusing if the generator causes a side effect.

It also doesn't fit with the intended use-case, e.g. new subscriber means set up a HTTP connection, for each onNext do a request on that connection.

It's only identical if the generator is a pure function which produces an immutable value.

Maybe worth an overload.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure the generator is called each time an observer subscribes, you just have to put it into a () => generator, as we did in doOnCompleted, so that's not a problem.
However, I'm not sure if we should use a by name argument here, because if people pass a () => expr instead of a by name argument, the errors (at compile time or sometimes even only at run time) can be confusing, see eg here.
And please be careful with overloads: If you want to use overloading, please add tests ensuring that the overloads don't conflict and don't harm type parameter inference, we've had bad experiences with overloads before...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks! Didn't know/think of that semantic.

Errors should generally less confusing, as generally the state won't be of type Unit so there's no automatic conversion from () => Unit to Unit as in the issue you linked. The stateless variants don't have a generator.

An advantage of () => T is that it makes it very explicit that it is a function (thus it will be executed, possible multiple times). I would not expect bar() to be run multiple times in foo(bar()), even if the signature is foo(x: => T), knowledge of the inner working of foo is necessary to know that.

I'll let you/the maintainers decide.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not as bad as in the issue I linked, but there still might be confusing compile-time errors, and the readability issue you mention. So I'd prefer () => T over of by-name, even though this is a bit less Scala-idiomatic.

apply[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe())

private[scala] class AsyncOnSubscribeImpl[S,T](val generatorF: () => S, val nextF: (S, Long, Observer[Observable[_ <: T]]) => S, val onUnsubscribeF: S => Unit) {
import rx.lang.scala.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there were plans to deprecate JavaConversions (see comment by @zsxwing), maybe you should pro-actively use JavaConverters instead. (not sure what the status of this plan currently is, though!)

@dhoepelman
Copy link
Collaborator Author

@samuelgruetter Missed your last comment, that sounds like a good solution. I'll change the PR to fit it somewhere in the following days and incorporate the other comments.

@dhoepelman
Copy link
Collaborator Author

@samuelgruetter Updated the PR. Also added map to Notification, let me know if you agree.

I'm not entirely sure I got the variance right, you talked about AsyncOnSubscribe[S, -T] but shouldn't T be covariant similar to how it is in Observable[+T]?

Only way I could make it work was with:
type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T]

Copy link
Collaborator

@samuelgruetter samuelgruetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some minor comments, but otherwise it's great 😃

@@ -63,6 +63,16 @@ sealed trait Notification[+T] {
}

def apply(observer: Observer[T]): Unit = accept(observer)

// TODO: Should this be public or private to rx.lang.scala?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's nice to have this public, and remove the TODO.

def map[U](f: T => U): Notification[U] = {
this match {
case Notification.OnNext(value) => Notification.OnNext(f(value))
// TODO: Or do we want to cast here? Notification.OnError[T] is not a Notification[U]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No cast, because it would be a cast that cannot be checked by the JVM, which might create a compile time warning, which would have to be suppressed etc, to complicated... (Unless someone presents benchmarks suggesting that a cast performs better).

* Semantics:
* * `generator` is called to provide an initial state on each new subscription
* * `next` is called with the last state to provide a data item and a new state for the next `next` call
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes

(also in AsyncOnSubscribe)

package object observables {}
package object observables {
type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T]
type AsyncOnSubscribe[S, +T] = rx.observables.AsyncOnSubscribe[S, _ <: T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the variance is correct here.
An OnSubscribe[T] is a function Subscribe[T] => Unit, and the left side of the => is a negative position, but being the argument of Subscriber is also a negative position, and minus times minus = plus 😉

"create(SyncOnSubscribe[S, T])" -> "[TODO]",
"create(AsyncOnSubscribe[S, T])" -> "[TODO]",
"create(SyncOnSubscribe[S, T])" -> "create(SyncOnSubscribe[S, T])",
"create(AsyncOnSubscribe[S, T])" -> "create(AsyncOnSubscribe[S, T])",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just delete these two lines? This list should only mention correspondences where Scala differs from Java, but here the signatures are the same.

@Test
def testStateful(): Unit = {
val last = 2000L
val sut = Observable.create(AsyncOnSubscribe(() => 0L)((count,demand) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sut? Could you please use a generally understandable name or abbreviation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was a generally well known abbreviation for subject under test in unit tests, but apparently that's only the case in my local bubble. Will change.

}
))
assertEquals((0L to last).toList, sut.toBlocking.toList)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example, could you please also add something like this to RxScalaDemo? Also, adding (A)SyncOnSubscribe requires some information in RxScalaDemo to be updated: createExampleGood is not good any more, but only "medium good" 😉 It respects unsubscription, but not backpressure, and using (A)SyncOnSubscribe would be better. Could you please review all examples related to Observable.create, and add some comments, and examples for (A)SyncOnSubscribe, so that this is all up to date?

@dhoepelman
Copy link
Collaborator Author

@samuelgruetter I incorporated your comments and altered the RxDemo file

I rewrote all of the create examples to use (A)SyncOnSubscribe, this seems correct no? Since rx.Observable.create(OnSubscribe) is slated to be deprecated (at least in the public API) it would seem logical for RxScala to move that way too.

I'm not entirely sure about createExampleWithBackpressure, but it seems to me (A)SyncOnSubscribe provides a way more user-friendly way to achieve the same.

Copy link
Collaborator

@samuelgruetter samuelgruetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@zsxwing, could you please have a look at this as well before merging, since this is quite an important change, and also because you originally wrote createExampleFromInputStream.

@zsxwing
Copy link
Member

zsxwing commented Dec 10, 2016

LGTM2.

@zsxwing zsxwing merged commit 30ec23a into ReactiveX:0.x Dec 10, 2016
@zsxwing
Copy link
Member

zsxwing commented Dec 10, 2016

Thanks!

@dhoepelman dhoepelman deleted the synconsubscribe branch December 10, 2016 11:22
@dhoepelman dhoepelman mentioned this pull request Dec 10, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants