Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce stack consumption #95

Merged
12 commits merged into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ before_install:
-in secring.gpg.enc -out secring.gpg -d; fi
script:
- sbt coverage 'fetchJVM/test' 'fetchJVM/coverageReport'
- sbt ++$TRAVIS_SCALA_VERSION 'monixJVM/test' 'monixJS/test'
- sbt ++$TRAVIS_SCALA_VERSION 'fetchJS/test'
- sbt ++$TRAVIS_SCALA_VERSION 'tests/test'
- sbt ++$TRAVIS_SCALA_VERSION 'docs/tut'
- sbt ++$TRAVIS_SCALA_VERSION 'readme/tut'
- sbt 'examples/test'
Expand Down
12 changes: 9 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ lazy val readmeSettings = buildSettings ++ tutSettings ++ Seq(
tutTargetDirectory := baseDirectory.value.getParentFile,
tutScalacOptions ~= (_.filterNot(Set("-Ywarn-unused-import", "-Ywarn-dead-code"))),
tutScalacOptions ++= (scalaBinaryVersion.value match {
case "2.10" => Seq("-Xdivergence211")
case _ => Nil
}),
case "2.10" => Seq("-Xdivergence211")
case _ => Nil
}),
tutNameFilter := """README.md""".r
)

Expand Down Expand Up @@ -179,3 +179,9 @@ lazy val examples = (project in file("examples"))
.settings(commonSettings: _*)
.settings(noPublishSettings: _*)
.settings(examplesSettings: _*)

lazy val tests = (project in file("."))
.aggregate(fetchJVM, fetchMonixJVM, debugJVM)
.settings(buildSettings)
.settings(commonSettings)
.settings(noPublishSettings)
32 changes: 22 additions & 10 deletions shared/src/main/scala/interpreters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package fetch

import scala.collection.immutable._
import scala.annotation.tailrec

import cats.{Applicative, ApplicativeError, Id, Monad, MonadError, Semigroup, ~>}
import cats.data.{Coproduct, EitherT, Ior, NonEmptyList, OptionT, StateT, Validated, ValidatedNel}
Expand Down Expand Up @@ -350,20 +351,29 @@ trait FetchInterpreters {
// independent queries, but this also has the consequence that pure
// values can be executed multiple times.
// eg : Fetch.pure(5).map { i => println("hello"); i * 2 }
FreeTopExt.inspect(f.step).foldMap {
case Join(ffl, ffr) => independentQueries(ffl) ++ independentQueries(ffr)
case one @ FetchOne(_, _) => one :: Nil
case many @ FetchMany(_, _) => many :: Nil
case _ => Nil
independentQueriesRec(f, List.empty)

@tailrec
private[this] def independentQueriesRec(
f: Fetch[_],
acc: List[FetchQuery[_, _]] = List.empty): List[FetchQuery[_, _]] =
FreeTopExt.inspect(f.step) match {
case Some(Join(ffl, ffr)) => {
val nacc = independentQueries(ffl)
independentQueriesRec(ffr, acc ++ nacc)
}
case Some(one @ FetchOne(_, _)) => one :: acc
case Some(many @ FetchMany(_, _)) => many :: acc
case _ => acc
}

/**
* Use a `DataSourceCache` to optimize a `FetchOp`.
* If the cache contains all the fetch identities, the fetch doesn't need to be
* executed and can be replaced by cached results.
*/
private[this] def simplify[A](cache: InMemoryCache)(fetch: Fetch[A]): Fetch[A] =
FreeTopExt.modify(fetch)(
private[this] def simplify[A](cache: InMemoryCache)(fetch: Fetch[A]): Fetch[A] = {
val interpreter: (FetchOp ~> Coproduct[FetchOp, Id, ?]) =
new (FetchOp ~> Coproduct[FetchOp, Id, ?]) {
def apply[X](fetchOp: FetchOp[X]): Coproduct[FetchOp, Id, X] = fetchOp match {
case one @ FetchOne(id, ds) =>
Expand All @@ -372,15 +382,17 @@ trait FetchInterpreters {
val fetched = ids.traverse(id => cache.get(ds.identity(id)))
Coproduct[FetchOp, Id, X](fetched.map(_.toList).toRight(many))
case join @ Join(fl, fr) =>
val sfl = simplify(cache)(fl)
val sfr = simplify(cache)(fr)
val sfl = FreeTopExt.modify(fl)(this)
val sfr = FreeTopExt.modify(fr)(this)
val optTuple = (FreeTopExt.inspectPure(sfl) |@| FreeTopExt.inspectPure(sfr)).tupled
Coproduct[FetchOp, Id, X](optTuple.toRight(Join(sfl, sfr)))
case other =>
Coproduct.leftc(other)
}
}
)

FreeTopExt.modify(fetch)(interpreter)
}

/**
* Combine multiple queries so the resulting `List` only contains one `FetchQuery`
Expand Down
146 changes: 146 additions & 0 deletions shared/src/test/scala/FetchBatchingTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2016 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch

import scala.concurrent._
import scala.concurrent.duration._

import org.scalatest._

import cats.MonadError
import cats.data.NonEmptyList
import cats.instances.list._

import fetch._
import fetch.implicits._
import cats.syntax.cartesian._
import cats.syntax.foldable._

class FetchBatchingTests extends AsyncFreeSpec with Matchers {
import TestHelper._

implicit override def executionContext = ExecutionContext.Implicits.global

case class BatchedDataSeq(id: Int)
implicit object MaxBatchSourceSeq extends DataSource[BatchedDataSeq, Int] {
override def name = "BatchSourceSeq"
override def fetchOne(id: BatchedDataSeq): Query[Option[Int]] = {
Query.sync(Option(id.id))
}
override def fetchMany(ids: NonEmptyList[BatchedDataSeq]): Query[Map[BatchedDataSeq, Int]] =
Query.sync(ids.toList.map(one => (one, one.id)).toMap)

override val maxBatchSize = Some(2)

override val batchExecution = Sequential
}

case class BatchedDataPar(id: Int)
implicit object MaxBatchSourcePar extends DataSource[BatchedDataPar, Int] {
override def name = "BatchSourcePar"
override def fetchOne(id: BatchedDataPar): Query[Option[Int]] = {
Query.sync(Option(id.id))
}
override def fetchMany(ids: NonEmptyList[BatchedDataPar]): Query[Map[BatchedDataPar, Int]] =
Query.sync(ids.toList.map(one => (one, one.id)).toMap)

override val maxBatchSize = Some(2)

override val batchExecution = Parallel
}

def fetchBatchedDataSeq(id: Int): Fetch[Int] = Fetch(BatchedDataSeq(id))
def fetchBatchedDataPar(id: Int): Fetch[Int] = Fetch(BatchedDataPar(id))

"A large fetch to a datasource with a maximum batch size is split and executed in sequence" in {
val fetch: Fetch[List[Int]] = Fetch.traverse(List.range(1, 6))(fetchBatchedDataSeq)
Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(1, 2, 3, 4, 5)
totalFetched(env.rounds) shouldEqual 5
totalBatches(env.rounds) shouldEqual 2
env.rounds.size shouldEqual 3
}
}

"A large fetch to a datasource with a maximum batch size is split and executed in parallel" in {
val fetch: Fetch[List[Int]] = Fetch.traverse(List.range(1, 6))(fetchBatchedDataPar)
Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(1, 2, 3, 4, 5)
totalFetched(env.rounds) shouldEqual 5
totalBatches(env.rounds) shouldEqual 2
env.rounds.size shouldEqual 1
}
}

"Fetches to datasources with a maximum batch size should be split and executed in parallel and sequentially" in {
val fetch: Fetch[List[Int]] =
Fetch.traverse(List.range(1, 6))(fetchBatchedDataPar) *>
Fetch.traverse(List.range(1, 6))(fetchBatchedDataSeq)

Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(1, 2, 3, 4, 5)
totalFetched(env.rounds) shouldEqual 5 + 5
totalBatches(env.rounds) shouldEqual 2 + 2
env.rounds.size shouldEqual 3
}
}

"A large (many) fetch to a datasource with a maximum batch size is split and executed in sequence" in {
val fetch: Fetch[List[Int]] =
Fetch.multiple(BatchedDataSeq(1), BatchedDataSeq(2), BatchedDataSeq(3))
Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(1, 2, 3)
totalFetched(env.rounds) shouldEqual 3
totalBatches(env.rounds) shouldEqual 2 // FetchMany(NEL(1, 2)) and FetchMany(NEL(3))
env.rounds.size shouldEqual 2
}
}

"A large (many) fetch to a datasource with a maximum batch size is split and executed in parallel" in {
val fetch: Fetch[List[Int]] =
Fetch.multiple(BatchedDataPar(1), BatchedDataPar(2), BatchedDataPar(3))
Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(1, 2, 3)
totalFetched(env.rounds) shouldEqual 3
totalBatches(env.rounds) shouldEqual 1
env.rounds.size shouldEqual 1
}
}

"Very deep fetches don't overflow stack or heap" in {
import cats.syntax.traverse._

val depth = 200
val fetch: Fetch[List[Int]] = (1 to depth).toList
.map((x) => (0 until 2).toList.traverse(fetchBatchedDataSeq))
.foldM(
List.empty[Int]
)((_, f) => f)

Fetch.runFetch[Future](fetch).map {
case (env, res) =>
res shouldEqual List(0, 1)
totalFetched(env.rounds) shouldEqual 2
env.rounds.size shouldEqual 1
}
}
}
144 changes: 144 additions & 0 deletions shared/src/test/scala/FetchReportingTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2016 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch

import scala.concurrent._
import scala.concurrent.duration._

import org.scalatest._

import cats.MonadError
import cats.data.NonEmptyList
import cats.instances.list._

import fetch._
import fetch.implicits._

class FetchReportingTests extends AsyncFreeSpec with Matchers {
import TestHelper._

val ME = implicitly[FetchMonadError[Future]]

implicit override def executionContext = ExecutionContext.Implicits.global

"Plain values have no rounds of execution" in {
val fetch: Fetch[Int] = Fetch.pure(42)
Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 0)
}

"Single fetches are executed in one round" in {
val fetch = one(1)
Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 1)
}

"Single fetches are executed in one round per binding in a for comprehension" in {
val fetch = for {
o <- one(1)
t <- one(2)
} yield (o, t)

Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 2)
}

"Single fetches for different data sources are executed in multiple rounds if they are in a for comprehension" in {
val fetch: Fetch[(Int, List[Int])] = for {
o <- one(1)
m <- many(3)
} yield (o, m)

Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 2)
}

"Single fetches combined with cartesian are run in one round" in {
import cats.syntax.cartesian._

val fetch: Fetch[(Int, List[Int])] = (one(1) |@| many(3)).tupled
val fut = Fetch.runEnv[Future](fetch)

fut.map(_.rounds.size shouldEqual 1)
}

"Single fetches combined with traverse are run in one round" in {
import cats.syntax.traverse._

val fetch: Fetch[List[Int]] = for {
manies <- many(3)
ones <- manies.traverse(one)
} yield ones

val fut = Fetch.runEnv[Future](fetch)
fut.map(_.rounds.size shouldEqual 2)
}

"The product of two fetches from the same data source implies batching" in {
val fetch: Fetch[(Int, Int)] = Fetch.join(one(1), one(3))

Fetch
.runEnv[Future](fetch)
.map(env => {
env.rounds.size shouldEqual 1
})
}

"The product of concurrent fetches of the same type implies everything fetched in batches" in {
val fetch = Fetch.join(
Fetch.join(
for {
a <- one(1)
b <- one(2)
c <- one(3)
} yield c,
for {
a <- one(2)
m <- many(4)
c <- one(3)
} yield c
),
one(3)
)

Fetch
.runEnv[Future](fetch)
.map(env => {
env.rounds.size shouldEqual 2
})
}

"Every level of sequenced concurrent of concurrent fetches is batched" in {
val fetch = Fetch.join(
Fetch.join(
for {
a <- Fetch.sequence(List(one(2), one(3), one(4)))
b <- Fetch.sequence(List(many(0), many(1)))
c <- Fetch.sequence(List(one(9), one(10), one(11)))
} yield c,
for {
a <- Fetch.sequence(List(one(5), one(6), one(7)))
b <- Fetch.sequence(List(many(2), many(3)))
c <- Fetch.sequence(List(one(12), one(13), one(14)))
} yield c
),
Fetch.sequence(List(one(15), one(16), one(17)))
)

Fetch
.runEnv[Future](fetch)
.map(env => {
env.rounds.size shouldEqual 3
})
}
}
Loading