-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement SyncOnSubscribe and AsyncOnSubscribe #220
Conversation
I didn't have time yet for a full review, but maybe let's first discuss some desirable properties of an (A)SyncOnSubscribe solution for RxScala. I have the following in mind:
This PR does well on 1) and 2), but not so well on 3) and 4). Regarding 4), I write I don't have a solution for 3) and 4) yet, but will keep thinking about it... @dhoepelman @zsxwing and others, please let me know what you think, and if you have any ideas. |
4 is a good point, I'll fix that. I would add a requirement 5: "It should feel natural so use from a Scala context". For 3 We could add the two (currently misnomered) We then have to think about whether we still want to provide the * We could assume a Scala programmer is sufficiently familiar with similar state concepts through things like state monads, |
I agree with 5). A note regarding wildcards in type parameters: All RxScala types have declaration site variance annotations ( Here's an idea for 3):
Like this, discoverability should be good: Users browse for Observable.create, then ask "how can I get an (A)SyncOnSubscribe", go to (A)SyncOnSubscribe, and find the constructors. WDYT? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found some small things for you to consider.
*/ | ||
object AsyncOnSubscribe { | ||
/** | ||
* Generates an `Observable` that synchronously calls the provided `next` function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be 'asynchronously'? (also in other scaladoc sections in this object)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next
is still synchronously called (see RxJava javadoc). The async in the name comes from next
's resultant Observable[T]
, which can asynchronously deliver up to requested
items.
I'll see if I can improve the docs to make this more clear.
* @see rx.observables.AsyncOnSubscribe.createStateful | ||
*/ | ||
@Experimental | ||
def createStateful[S, T](generator: () => S)(next: (S, Long, Observer[Observable[_ <: T]]) => S, onUnsubscribe: S => Unit = (_:S) => ()): Observable[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered a call-by-name parameter for generator
? Would look better in using it, I think... (also in other functions in this object)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this. However the spec clearly states that generator
will be called each time an observer subscribes. If we use call-by-name it will only be executed once very early into the program, this would be confusing if the generator causes a side effect.
It also doesn't fit with the intended use-case, e.g. new subscriber means set up a HTTP connection, for each onNext
do a request on that connection.
It's only identical if the generator is a pure function which produces an immutable value.
Maybe worth an overload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure the generator
is called each time an observer subscribes, you just have to put it into a () => generator
, as we did in doOnCompleted, so that's not a problem.
However, I'm not sure if we should use a by name argument here, because if people pass a () => expr
instead of a by name argument, the errors (at compile time or sometimes even only at run time) can be confusing, see eg here.
And please be careful with overloads: If you want to use overloading, please add tests ensuring that the overloads don't conflict and don't harm type parameter inference, we've had bad experiences with overloads before...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks! Didn't know/think of that semantic.
Errors should generally less confusing, as generally the state won't be of type Unit
so there's no automatic conversion from () => Unit
to Unit
as in the issue you linked. The stateless variants don't have a generator.
An advantage of () => T
is that it makes it very explicit that it is a function (thus it will be executed, possible multiple times). I would not expect bar()
to be run multiple times in foo(bar())
, even if the signature is foo(x: => T)
, knowledge of the inner working of foo
is necessary to know that.
I'll let you/the maintainers decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not as bad as in the issue I linked, but there still might be confusing compile-time errors, and the readability issue you mention. So I'd prefer () => T
over of by-name, even though this is a bit less Scala-idiomatic.
apply[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe()) | ||
|
||
private[scala] class AsyncOnSubscribeImpl[S,T](val generatorF: () => S, val nextF: (S, Long, Observer[Observable[_ <: T]]) => S, val onUnsubscribeF: S => Unit) { | ||
import rx.lang.scala.JavaConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there were plans to deprecate JavaConversions
(see comment by @zsxwing), maybe you should pro-actively use JavaConverters
instead. (not sure what the status of this plan currently is, though!)
@samuelgruetter Missed your last comment, that sounds like a good solution. I'll change the PR to fit it somewhere in the following days and incorporate the other comments. |
0fcf8e9
to
dee5513
Compare
dee5513
to
5b20d2e
Compare
@samuelgruetter Updated the PR. Also added I'm not entirely sure I got the variance right, you talked about Only way I could make it work was with: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some minor comments, but otherwise it's great 😃
@@ -63,6 +63,16 @@ sealed trait Notification[+T] { | |||
} | |||
|
|||
def apply(observer: Observer[T]): Unit = accept(observer) | |||
|
|||
// TODO: Should this be public or private to rx.lang.scala? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's nice to have this public, and remove the TODO.
def map[U](f: T => U): Notification[U] = { | ||
this match { | ||
case Notification.OnNext(value) => Notification.OnNext(f(value)) | ||
// TODO: Or do we want to cast here? Notification.OnError[T] is not a Notification[U] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No cast, because it would be a cast that cannot be checked by the JVM, which might create a compile time warning, which would have to be suppressed etc, to complicated... (Unless someone presents benchmarks suggesting that a cast performs better).
* Semantics: | ||
* * `generator` is called to provide an initial state on each new subscription | ||
* * `next` is called with the last state to provide a data item and a new state for the next `next` call | ||
* * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes
(also in AsyncOnSubscribe)
package object observables {} | ||
package object observables { | ||
type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T] | ||
type AsyncOnSubscribe[S, +T] = rx.observables.AsyncOnSubscribe[S, _ <: T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the variance is correct here.
An OnSubscribe[T]
is a function Subscribe[T] => Unit
, and the left side of the =>
is a negative position, but being the argument of Subscriber
is also a negative position, and minus times minus = plus 😉
"create(SyncOnSubscribe[S, T])" -> "[TODO]", | ||
"create(AsyncOnSubscribe[S, T])" -> "[TODO]", | ||
"create(SyncOnSubscribe[S, T])" -> "create(SyncOnSubscribe[S, T])", | ||
"create(AsyncOnSubscribe[S, T])" -> "create(AsyncOnSubscribe[S, T])", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you just delete these two lines? This list should only mention correspondences where Scala differs from Java, but here the signatures are the same.
@Test | ||
def testStateful(): Unit = { | ||
val last = 2000L | ||
val sut = Observable.create(AsyncOnSubscribe(() => 0L)((count,demand) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why sut
? Could you please use a generally understandable name or abbreviation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this was a generally well known abbreviation for subject under test
in unit tests, but apparently that's only the case in my local bubble. Will change.
} | ||
)) | ||
assertEquals((0L to last).toList, sut.toBlocking.toList) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good example, could you please also add something like this to RxScalaDemo
? Also, adding (A)SyncOnSubscribe requires some information in RxScalaDemo
to be updated: createExampleGood
is not good any more, but only "medium good" 😉 It respects unsubscription, but not backpressure, and using (A)SyncOnSubscribe would be better. Could you please review all examples related to Observable.create
, and add some comments, and examples for (A)SyncOnSubscribe, so that this is all up to date?
fc55140
to
b515226
Compare
@samuelgruetter I incorporated your comments and altered the RxDemo file I rewrote all of the I'm not entirely sure about |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
@zsxwing, could you please have a look at this as well before merging, since this is quite an important change, and also because you originally wrote createExampleFromInputStream
.
LGTM2. |
Thanks! |
See #219 for some discussion.
I implemented both option 2 (as
createX
) and 3 (asapplyX
), as 2 has some major annoyances when used from scala: type interferencing can't determine theT
and the user requires at least 2 statements because you need to call the observer (you can't use a single expression).The effects of this can be seen next to each other in the tests.
However, this causes duplicate very-similar functionality. If we want to avoid this I suggest we drop the java style
createX
methods and rename theapplyX
methods tocreateX
.