Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataSourceCache parameterised to F[_] #160

Merged
merged 11 commits into from
Sep 25, 2018
21 changes: 14 additions & 7 deletions docs/src/main/tut/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ We'll be using the default in-memory cache, prepopulated with some data. The cac
is calculated with the `DataSource`'s `name` method and the request identity.

```tut:silent
val cache = InMemoryCache.from(
def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Int, User](
(UserSource.name, 1) -> User(1, "@dialelo")
)
```
Expand Down Expand Up @@ -526,7 +526,7 @@ The default cache is implemented as an immutable in-memory map, but users are fr
There is no need for the cache to be mutable since fetch executions run in an interpreter that uses the state monad. Note that the `update` method in the `DataSourceCache` trait yields a new, updated cache.

```scala
trait DataSourceCache {
trait DataSourceCache[F[_]] {
def insert[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache]
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are no longer correct, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right!

}
Expand All @@ -535,12 +535,19 @@ trait DataSourceCache {
Let's implement a cache that forgets everything we store in it.

```tut:silent
import cats.Applicative
case class ForgetfulCache[F[_]]() extends DataSourceCache[F] {
def insert[I, A](i: I, v: A, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[DataSourceCache[F]] =
C.pure(this)

final case class ForgetfulCache() extends DataSourceCache {
def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = Applicative[F].pure(this)
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None)
def lookup[I, A](i: I, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[Option[A]] =
C.pure(None)
}

def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]()
```

We can now use our implementation of the cache when running a fetch.
Expand All @@ -551,7 +558,7 @@ def fetchSameTwice[F[_] : ConcurrentEffect]: Fetch[F, (User, User)] = for {
another <- getUser(1)
} yield (one, another)

Fetch.run[IO](fetchSameTwice, ForgetfulCache()).unsafeRunTimed(5.seconds)
Fetch.run[IO](fetchSameTwice, forgetfulCache).unsafeRunTimed(5.seconds)
```

# Batching
Expand Down
63 changes: 36 additions & 27 deletions shared/src/main/scala/cache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package fetch

import cats._
import cats.effect._
import cats.data.NonEmptyList
import cats.instances.list._
import cats.syntax.all._
import cats.effect._
import cats.temp.par._

final class DataSourceName(val name: String) extends AnyVal
final class DataSourceId(val id: Any) extends AnyVal
Expand All @@ -28,42 +30,49 @@ final class DataSourceResult(val result: Any) extends AnyVal
/**
* A `Cache` trait so the users of the library can provide their own cache.
*/
trait DataSourceCache {
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]]
def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache]
def insertMany[F[_]: ConcurrentEffect, I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache] =
vs.toList.foldLeftM(this)({
case (c, (i, v)) => c.insert(i, v, ds)
})
trait DataSourceCache[F[_]] {
def lookup[I, A](i: I, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we didn't put these constraints on the methods anymore.

Now that we have parametrized DataSourceCache on F, the implementation could specify the constraints.
For example for InMemoryCache we would only need Applicative (Monad for foldLeftM).

Copy link
Contributor Author

@purrgrammer purrgrammer Sep 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'm going to try to remove these constraints from the trait.

): F[Option[A]]

def insert[I, A](i: I, v: A, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[DataSourceCache[F]]

// def delete[I, A](i: I, v: A, ds: DataSource[I, A])(
// implicit C: ConcurrentEffect[F], P: Par[F]
// ): F[Unit]

def bulkInsert[I, A](vs: List[(I, A)], ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[DataSourceCache[F]] = {
vs.foldLeftM(this){
case (acc, (i, v)) =>
acc.insert(i, v, ds)
}
}
}

/**
* A cache that stores its elements in memory.
*/
case class InMemoryCache(state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache {
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] =
Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A]))
case class InMemoryCache[F[_]](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] {
def lookup[I, A](i: I, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[Option[A]] =
C.pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A]))

def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] =
Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v))))
def insert[I, A](i: I, v: A, ds: DataSource[I, A])(
implicit C: ConcurrentEffect[F], P: Par[F]
): F[DataSourceCache[F]] =
C.pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v))))
}

object InMemoryCache {
def empty: InMemoryCache = InMemoryCache(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])
def empty[F[_] : ConcurrentEffect : Par]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult])

def from[I, A](results: ((String, I), A)*): InMemoryCache =
InMemoryCache(results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({
def from[F[_]: ConcurrentEffect : Par, I, A](results: ((String, I), A)*): InMemoryCache[F] =
InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({
case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v))
}))

implicit val inMemoryCacheMonoid: Monoid[InMemoryCache] = {
implicit val anySemigroup = new Semigroup[Any] {
def combine(a: Any, b: Any): Any = b
}
new Monoid[InMemoryCache] {
def empty: InMemoryCache = InMemoryCache.empty
def combine(c1: InMemoryCache, c2: InMemoryCache): InMemoryCache =
InMemoryCache(c1.state ++ c2.state)
}
}
}
59 changes: 46 additions & 13 deletions shared/src/main/scala/fetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,28 @@ object `package` {
def run[F[_]]: FetchRunner[F] = new FetchRunner[F]

private[fetch] class FetchRunner[F[_]](private val dummy: Boolean = true) extends AnyVal {
def apply[A](
fa: Fetch[F, A]
)(
implicit
P: Par[F],
C: ConcurrentEffect[F],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be this replaced by Monad?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid is not. The effect type F[_] of the DataSource instances requires a ConcurrentEffect[F] and Par[F]. For running F we need ContextShift[F] and Timer[F]so these are the less restrictive implicits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need Sync for Ref.of and the DataSource methods still require ConcurrentEffect at the moment.
So performRun and co need ConcurrentEffect as well.

CS: ContextShift[F],
T: Timer[F]
): F[A] =
apply(fa, InMemoryCache.empty[F])

def apply[A](
fa: Fetch[F, A],
cache: DataSourceCache = InMemoryCache.empty
cache: DataSourceCache[F]
)(
implicit
P: Par[F],
C: ConcurrentEffect[F],
CS: ContextShift[F],
T: Timer[F]
): F[A] = for {
cache <- Ref.of[F, DataSourceCache](cache)
cache <- Ref.of[F, DataSourceCache[F]](cache)
result <- performRun(fa, cache, None)
} yield result
}
Expand All @@ -289,9 +300,20 @@ object `package` {
def runEnv[F[_]]: FetchRunnerEnv[F] = new FetchRunnerEnv[F]

private[fetch] class FetchRunnerEnv[F[_]](private val dummy: Boolean = true) extends AnyVal {
def apply[A](
fa: Fetch[F, A]
)(
implicit
P: Par[F],
C: ConcurrentEffect[F],
CS: ContextShift[F],
T: Timer[F]
): F[(Env, A)] =
apply(fa, InMemoryCache.empty[F])

def apply[A](
fa: Fetch[F, A],
cache: DataSourceCache = InMemoryCache.empty
cache: DataSourceCache[F]
)(
implicit
P: Par[F],
Expand All @@ -300,7 +322,7 @@ object `package` {
T: Timer[F]
): F[(Env, A)] = for {
env <- Ref.of[F, Env](FetchEnv())
cache <- Ref.of[F, DataSourceCache](cache)
cache <- Ref.of[F, DataSourceCache[F]](cache)
result <- performRun(fa, cache, Some(env))
e <- env.get
} yield (e, result)
Expand All @@ -312,17 +334,28 @@ object `package` {
def runCache[F[_]]: FetchRunnerCache[F] = new FetchRunnerCache[F]

private[fetch] class FetchRunnerCache[F[_]](private val dummy: Boolean = true) extends AnyVal {
def apply[A](
fa: Fetch[F, A]
)(
implicit
P: Par[F],
C: ConcurrentEffect[F],
CS: ContextShift[F],
T: Timer[F]
): F[(DataSourceCache[F], A)] =
apply(fa, InMemoryCache.empty[F])

def apply[A](
fa: Fetch[F, A],
cache: DataSourceCache = InMemoryCache.empty
cache: DataSourceCache[F]
)(
implicit
P: Par[F],
C: ConcurrentEffect[F],
CS: ContextShift[F],
T: Timer[F]
): F[(DataSourceCache, A)] = for {
cache <- Ref.of[F, DataSourceCache](cache)
): F[(DataSourceCache[F], A)] = for {
cache <- Ref.of[F, DataSourceCache[F]](cache)
result <- performRun(fa, cache, None)
c <- cache.get
} yield (c, result)
Expand All @@ -332,7 +365,7 @@ object `package` {

private def performRun[F[_], A](
fa: Fetch[F, A],
cache: Ref[F, DataSourceCache],
cache: Ref[F, DataSourceCache[F]],
env: Option[Ref[F, Env]]
)(
implicit
Expand Down Expand Up @@ -360,7 +393,7 @@ object `package` {

private def fetchRound[F[_], A](
rs: RequestMap[F],
cache: Ref[F, DataSourceCache],
cache: Ref[F, DataSourceCache[F]],
env: Option[Ref[F, Env]]
)(
implicit
Expand All @@ -387,7 +420,7 @@ object `package` {

private def runBlockedRequest[F[_], A](
blocked: BlockedRequest[F],
cache: Ref[F, DataSourceCache],
cache: Ref[F, DataSourceCache[F]],
env: Option[Ref[F, Env]]
)(
implicit
Expand All @@ -405,7 +438,7 @@ object `package` {
private def runFetchOne[F[_]](
q: FetchOne[Any, Any],
putResult: FetchStatus => F[Unit],
cache: Ref[F, DataSourceCache],
cache: Ref[F, DataSourceCache[F]],
env: Option[Ref[F, Env]]
)(
implicit
Expand Down Expand Up @@ -450,7 +483,7 @@ object `package` {
private def runBatch[F[_]](
q: Batch[Any, Any],
putResult: FetchStatus => F[Unit],
cache: Ref[F, DataSourceCache],
cache: Ref[F, DataSourceCache[F]],
env: Option[Ref[F, Env]]
)(
implicit
Expand Down Expand Up @@ -493,7 +526,7 @@ object `package` {
endTime <- T.clock.monotonic(MILLISECONDS)
resultMap = combineBatchResults(batchedRequest.results, cachedResults)

updatedCache <- c.insertMany(batchedRequest.results, request.ds)
updatedCache <- c.bulkInsert(batchedRequest.results.toList, request.ds)
_ <- cache.set(updatedCache)

result <- putResult(FetchDone[Map[Any, Any]](resultMap))
Expand Down
12 changes: 6 additions & 6 deletions shared/src/test/scala/FetchAsyncQueryTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers {
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

"We can interpret an async fetch into an IO" in {
def fetch[F[_] : ConcurrentEffect]: Fetch[F, Article] =
def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Article] =
article(1)

val io = Fetch.run[IO](fetch)
Expand All @@ -42,7 +42,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers {
}

"We can combine several async data sources and interpret a fetch into an IO" in {
def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Article, Author)] = for {
def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Article, Author)] = for {
art <- article(1)
author <- author(art)
} yield (art, author)
Expand All @@ -53,7 +53,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers {
}

"We can use combinators in a for comprehension and interpret a fetch from async sources into an IO" in {
def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Article]] = for {
def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Article]] = for {
articles <- List(1, 1, 2).traverse(article[F])
} yield articles

Expand All @@ -67,7 +67,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers {
}

"We can use combinators and multiple sources in a for comprehension and interpret a fetch from async sources into an IO" in {
def fetch[F[_] : ConcurrentEffect] = for {
def fetch[F[_] : ConcurrentEffect : Par] = for {
articles <- List(1, 1, 2).traverse(article[F])
authors <- articles.traverse(author[F])
} yield (articles, authors)
Expand Down Expand Up @@ -108,7 +108,7 @@ object DataSources {
})
}

def article[F[_] : ConcurrentEffect](id: Int): Fetch[F, Article] =
def article[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Article] =
Fetch(ArticleId(id), ArticleAsync)

case class AuthorId(id: Int)
Expand All @@ -127,6 +127,6 @@ object DataSources {
}))
}

def author[F[_] : ConcurrentEffect](a: Article): Fetch[F, Author] =
def author[F[_] : ConcurrentEffect : Par](a: Article): Fetch[F, Author] =
Fetch(AuthorId(a.author), AuthorAsync)
}
Loading