Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor toMultimap #68

Merged
merged 3 commits into from
Dec 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1005,34 +1005,37 @@ class RxScalaDemo extends JUnitSuite {
@Test def toMultimapExample1(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
val keySelector = (s: String) => s.head
val m = o.toMultimap(keySelector)
val m = o.toMultiMap(keySelector)
println(m.toBlocking.single)
}

@Test def toMultimapExample2(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
val keySelector = (s: String) => s.head
val valueSelector = (s: String) => s.tail
val m = o.toMultimap(keySelector, valueSelector)
val m = o.toMultiMap(keySelector, valueSelector)
println(m.toBlocking.single)
}

@Test def toMultimapExample3(): Unit = {
val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
val keySelector = (s: String) => s.head
val valueSelector = (s: String) => s.tail
val mapFactory = () => mutable.Map('d' -> mutable.Buffer("oug"))
val m = o.toMultimap(keySelector, valueSelector, mapFactory)
val m = o.toMultiMap(keySelector, valueSelector, {
new mutable.HashMap[Char, mutable.Set[String]] with mutable.MultiMap[Char, String] addBinding('d', "oug")
})
println(m.toBlocking.single.mapValues(_.toList))
}

@Test def toMultimapExample4(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
val keySelector = (s: String) => s.head
val valueSelector = (s: String) => s.tail
val mapFactory = () => mutable.Map('d' -> mutable.ListBuffer("oug"))
val bufferFactory = (k: Char) => mutable.ListBuffer[String]()
val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory)
val m = o.toMultiMap(keySelector, valueSelector, {
new mutable.HashMap[Char, mutable.Set[String]] with mutable.MultiMap[Char, String] {
override def makeSet = new mutable.TreeSet[String]
}
})
println(m.toBlocking.single)
}

Expand Down
76 changes: 26 additions & 50 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3985,92 +3985,68 @@ trait Observable[+T]
}

/**
* Returns an Observable that emits a single `Map` that contains an `Seq` of items emitted by the
* source Observable keyed by a specified `keySelector` function.
* Returns an Observable that emits a single `mutable.MultiMap` that contains items emitted by the
* source Observable keyed by a specified `keySelector` function. The items having the same
* key will be put into a `Set`.
*
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toMultiMap.png">
*
* @param keySelector the function that extracts the key from the source items to be used as key in the HashMap
* @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
* the source Observable
* @param keySelector the function that extracts the key from the source items to be used as key in the `mutable.MultiMap`
* @return an Observable that emits a single item: a `mutable.MultiMap` that contains items emitted by the
* source Observable keyed by a specified `keySelector` function.
*/
def toMultimap[K](keySelector: T => K): Observable[scala.collection.Map[K, Seq[T]]] = {
toMultimap(keySelector, k => k)
def toMultiMap[K, V >: T](keySelector: T => K): Observable[mutable.MultiMap[K, V]] = {
toMultiMap(keySelector, k => k)
}

/**
* Returns an Observable that emits a single `Map` that contains an `Seq` of values extracted by a
* Returns an Observable that emits a single `mutable.MultiMap` that contains values extracted by a
* specified `valueSelector` function from items emitted by the source Observable, keyed by a
* specified `keySelector` function.
* specified `keySelector` function. The values having the same key will be put into a `Set`.
*
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toMultiMap.png">
*
* @param keySelector the function that extracts a key from the source items to be used as key in the HashMap
* @param valueSelector the function that extracts a value from the source items to be used as value in the HashMap
* @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
* @param keySelector the function that extracts a key from the source items to be used as key in the `mutable.MultiMap`
* @param valueSelector the function that extracts a value from the source items to be used as value in the `mutable.MultiMap`
* @return an Observable that emits a single item: a `mutable.MultiMap` that contains keys and values mapped from
* the source Observable
*/
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[scala.collection.Map[K, Seq[V]]] = {
toMultimap(keySelector, valueSelector, () => mutable.Map[K, mutable.Buffer[V]]())
}

/**
* Returns an Observable that emits a single `mutable.Map[K, mutable.Buffer[V]]`, returned by a specified `mapFactory` function, that
* contains values, extracted by a specified `valueSelector` function from items emitted by the source Observable and
* keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`.
*
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toMultiMap.png">
*
* @param keySelector the function that extracts a key from the source items to be used as the key in the Map
* @param valueSelector the function that extracts a value from the source items to be used as the value in the Map
* @param mapFactory he function that returns a `mutable.Map[K, mutable.Buffer[V]]` instance to be used
* @return an Observable that emits a single item: a `mutable.Map[K, mutable.Buffer[V]]` that contains items mapped
* from the source Observable
*/
def toMultimap[K, V, M <: mutable.Map[K, mutable.Buffer[V]]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M): Observable[M] = {
toMultimap[K, V, mutable.Buffer[V], M](keySelector, valueSelector, mapFactory, k => mutable.Buffer[V]())
def toMultiMap[K, V](keySelector: T => K, valueSelector: T => V): Observable[mutable.MultiMap[K, V]] = {
toMultiMap(keySelector, valueSelector, new mutable.HashMap[K, mutable.Set[V]] with mutable.MultiMap[K, V])
}

/**
* Returns an Observable that emits a single `mutable.Map[K, B]`, returned by a specified `mapFactory` function, that
* Returns an Observable that emits a single `mutable.MultiMap`, returned by a specified `multiMapFactory` function, that
* contains values extracted by a specified `valueSelector` function from items emitted by the source Observable, and
* keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`.
* keyed by the `keySelector` function. The values having the same key will be put into a `Set`.
*
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toMultiMap.png">
*
* @param keySelector the function that extracts a key from the source items to be used as the key in the Map
* @param valueSelector the function that extracts a value from the source items to be used as the value in the Map
* @param mapFactory the function that returns a Map instance to be used
* @param bufferFactory the function that returns a `mutable.Buffer[V]` instance for a particular key to be used in the Map
* @return an Observable that emits a single item: a `mutable.Map[K, B]` that contains mapped items from the source Observable.
* @param keySelector the function that extracts a key from the source items to be used as the key in the `mutable.MultiMap`
* @param valueSelector the function that extracts a value from the source items to be used as the value in the `mutable.MultiMap`
* @param multiMapFactory a `mutable.MultiMap` instance to be used. Note: tis is a by-name parameter.
* @return an Observable that emits a single item: a `mutable.MultiMap` that contains keys and values mapped from the source Observable.
*/
def toMultimap[K, V, B <: mutable.Buffer[V], M <: mutable.Map[K, B]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M, bufferFactory: K => B): Observable[M] = {
// It's complicated to convert `mutable.Map[K, mutable.Buffer[V]]` to `java.util.Map[K, java.util.Collection[V]]`,
// so RxScala implements `toMultimap` directly.
// Choosing `mutable.Buffer/Map` is because `append/update` is necessary to implement an efficient `toMultimap`.
def toMultiMap[K, V, M <: mutable.MultiMap[K, V]](keySelector: T => K, valueSelector: T => V, multiMapFactory: => M): Observable[M] = {
lift {
(subscriber: Subscriber[M]) => {
new Subscriber[T](subscriber) {
val map = mapFactory()
val mm = multiMapFactory

override def onStart(): Unit = request(Long.MaxValue)

override def onNext(t: T): Unit = {
val key = keySelector(t)
val values = map.get(key) match {
case Some(v) => v
case None => bufferFactory(key)
}
values += valueSelector(t)
map += key -> values: Unit
val value = valueSelector(t)
mm.addBinding(key, value)
}

override def onError(e: Throwable): Unit = {
subscriber.onError(e)
}

override def onCompleted(): Unit = {
subscriber.onNext(map)
subscriber.onNext(mm)
subscriber.onCompleted()
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class CompletenessTest extends JUnitSuite {
val fromFuture = "[TODO: Decide how Scala Futures should relate to Observables. Should there be a " +
"common base interface for Future and Observable? And should Futures also have an unsubscribe method?]"
val commentForTakeLastBuffer = "[use `takeRight(...).toSeq`]"
val commentForToMultimapWithCollectionFactory = "[`toMultiMap` in RxScala returns `mutable.MultiMap`. It's a" +
" `Map[A, mutable.Set[B]]`. You can override `def makeSet: Set[B]` to create a custom Set.]"
val commentForRange = "[The `range` method of the Java Observable takes `start` and `count` parameters, " +
"whereas the `range` method of the Scala Iterable takes `start` and `end` parameters, " +
"so adding any of these two would be confusing. Moreover, since `scala.collection.immutable.Range` is " +
Expand Down Expand Up @@ -182,8 +184,10 @@ class CompletenessTest extends JUnitSuite {
"timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)",
"toList()" -> "toSeq",
"toMap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, V]])" -> "[mapFactory is not necessary because Scala has `CanBuildFrom`]",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultimap(T => K, T => V, () => M)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => M, K => B)",
"toMultimap(Func1[_ >: T, _ <: K])" -> "toMultiMap(T => K)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V])" -> "toMultiMap(T => K, T => V)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultiMap(T => K, T => V, => M)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> commentForToMultimapWithCollectionFactory,
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Int)" -> "tumbling(Int)",
Expand Down
47 changes: 16 additions & 31 deletions src/test/scala/rx/lang/scala/ObservableTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,46 +231,30 @@ class ObservableTests extends JUnitSuite {
}

@Test
def testToMultimap() {
val o = Observable.just("a", "b", "cc", "dd").toMultimap(_.length)
val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd"))
def testToMultiMap() {
val o = Observable.just("a", "b", "cc", "dd").toMultiMap(_.length)
val expected = Map(1 -> Set("a", "b"), 2 -> Set("cc", "dd"))
assertEquals(expected, o.toBlocking.single)
}

@Test
def testToMultimapWithValueSelector() {
val o = Observable.just("a", "b", "cc", "dd").toMultimap(_.length, s => s + s)
val expected = Map(1 -> List("aa", "bb"), 2 -> List("cccc", "dddd"))
def testToMultiMapWithValueSelector() {
val o = Observable.just("a", "b", "cc", "dd").toMultiMap(_.length, s => s + s)
val expected = Map(1 -> Set("aa", "bb"), 2 -> Set("cccc", "dddd"))
assertEquals(expected, o.toBlocking.single)
}

@Test
def testToMultimapWithMapFactory() {
val m = mutable.Map[Int, mutable.Buffer[String]]()
val o = Observable.just("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m)
val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd"))
def testToMultiMapWithMapFactory() {
val m = new mutable.LinkedHashMap[Int, mutable.Set[String]] with mutable.MultiMap[Int, String]
val o = Observable.just("a", "b", "cc", "dd").toMultiMap(_.length, s => s, m)
val expected = Map(1 -> Set("a", "b"), 2 -> Set("cc", "dd"))
val r = o.toBlocking.single
// r should be the same instance created by the `mapFactory`
// r should be the same instance created by the `multiMapFactory`
assertTrue(m eq r)
assertEquals(expected, r)
}

@Test
def testToMultimapWithBufferFactory() {
val m = mutable.Map[Int, mutable.Buffer[String]]()
val ls = List(mutable.Buffer[String](), mutable.Buffer[String]())
val o = Observable.just("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m, (i: Int) => ls(i - 1))
val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd"))
val r = o.toBlocking.single
// r should be the same instance created by the `mapFactory`
assertTrue(m eq r)
// r(1) should be the same instance created by the first calling `bufferFactory`
assertTrue(ls(0) eq r(1))
// r(2) should be the same instance created by the second calling `bufferFactory`
assertTrue(ls(1) eq r(2))
assertEquals(expected, r)
}

@Test
def testCreate() {
var called = false
Expand Down Expand Up @@ -408,13 +392,13 @@ class ObservableTests extends JUnitSuite {

@Test
def testToMultimapWithBackpressure() {
var result: scala.collection.Map[Int, scala.collection.Seq[Int]] = null
var result: mutable.MultiMap[Int, Int] = null
var completed = false
var error = false
Observable.just(1, 2, 3, 4).toMultimap(_ % 2).subscribe(new Subscriber[scala.collection.Map[Int, scala.collection.Seq[Int]]] {
Observable.just(1, 2, 3, 4).toMultiMap(_ % 2).subscribe(new Subscriber[mutable.MultiMap[Int, Int]] {
override def onStart(): Unit = request(1)

override def onNext(v: scala.collection.Map[Int, scala.collection.Seq[Int]]): Unit = {
override def onNext(v: mutable.MultiMap[Int, Int]): Unit = {
result = v
request(1)
}
Expand All @@ -423,7 +407,8 @@ class ObservableTests extends JUnitSuite {

override def onCompleted(): Unit = completed = true
})
assertEquals(Map((1, Seq(1, 3)), (0, Seq(2, 4))), result)
val expected = Map(0 -> Set(2, 4), 1 -> Set(1, 3))
assertEquals(expected, result)
assertTrue(completed)
assertFalse(error)
}
Expand Down