-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataSourceCache parameterised to F[_] #160
Changes from 7 commits
3f4d3df
022098d
15bf018
79a933a
62e30a2
9e08d60
a5cf2c5
455e005
11de8d3
060c3d5
3c19563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,11 @@ | |
package fetch | ||
|
||
import cats._ | ||
import cats.effect._ | ||
import cats.data.NonEmptyList | ||
import cats.instances.list._ | ||
import cats.syntax.all._ | ||
import cats.effect._ | ||
import cats.temp.par._ | ||
|
||
final class DataSourceName(val name: String) extends AnyVal | ||
final class DataSourceId(val id: Any) extends AnyVal | ||
|
@@ -28,42 +30,49 @@ final class DataSourceResult(val result: Any) extends AnyVal | |
/** | ||
* A `Cache` trait so the users of the library can provide their own cache. | ||
*/ | ||
trait DataSourceCache { | ||
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] | ||
def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] | ||
def insertMany[F[_]: ConcurrentEffect, I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache] = | ||
vs.toList.foldLeftM(this)({ | ||
case (c, (i, v)) => c.insert(i, v, ds) | ||
}) | ||
trait DataSourceCache[F[_]] { | ||
def lookup[I, A](i: I, ds: DataSource[I, A])( | ||
implicit C: ConcurrentEffect[F], P: Par[F] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we didn't put these constraints on the methods anymore. Now that we have parametrized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'm going to try to remove these constraints from the trait. |
||
): F[Option[A]] | ||
|
||
def insert[I, A](i: I, v: A, ds: DataSource[I, A])( | ||
implicit C: ConcurrentEffect[F], P: Par[F] | ||
): F[DataSourceCache[F]] | ||
|
||
// def delete[I, A](i: I, v: A, ds: DataSource[I, A])( | ||
// implicit C: ConcurrentEffect[F], P: Par[F] | ||
// ): F[Unit] | ||
|
||
def bulkInsert[I, A](vs: List[(I, A)], ds: DataSource[I, A])( | ||
implicit C: ConcurrentEffect[F], P: Par[F] | ||
): F[DataSourceCache[F]] = { | ||
vs.foldLeftM(this){ | ||
case (acc, (i, v)) => | ||
acc.insert(i, v, ds) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* A cache that stores its elements in memory. | ||
*/ | ||
case class InMemoryCache(state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache { | ||
def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = | ||
Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) | ||
case class InMemoryCache[F[_]](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { | ||
def lookup[I, A](i: I, ds: DataSource[I, A])( | ||
implicit C: ConcurrentEffect[F], P: Par[F] | ||
): F[Option[A]] = | ||
C.pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) | ||
|
||
def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = | ||
Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) | ||
def insert[I, A](i: I, v: A, ds: DataSource[I, A])( | ||
implicit C: ConcurrentEffect[F], P: Par[F] | ||
): F[DataSourceCache[F]] = | ||
C.pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) | ||
} | ||
|
||
object InMemoryCache { | ||
def empty: InMemoryCache = InMemoryCache(Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) | ||
def empty[F[_] : ConcurrentEffect : Par]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) | ||
|
||
def from[I, A](results: ((String, I), A)*): InMemoryCache = | ||
InMemoryCache(results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ | ||
def from[F[_]: ConcurrentEffect : Par, I, A](results: ((String, I), A)*): InMemoryCache[F] = | ||
InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ | ||
case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v)) | ||
})) | ||
|
||
implicit val inMemoryCacheMonoid: Monoid[InMemoryCache] = { | ||
implicit val anySemigroup = new Semigroup[Any] { | ||
def combine(a: Any, b: Any): Any = b | ||
} | ||
new Monoid[InMemoryCache] { | ||
def empty: InMemoryCache = InMemoryCache.empty | ||
def combine(c1: InMemoryCache, c2: InMemoryCache): InMemoryCache = | ||
InMemoryCache(c1.state ++ c2.state) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,17 +268,28 @@ object `package` { | |
def run[F[_]]: FetchRunner[F] = new FetchRunner[F] | ||
|
||
private[fetch] class FetchRunner[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be this replaced by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm afraid is not. The effect type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need |
||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[A] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[A] = for { | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, None) | ||
} yield result | ||
} | ||
|
@@ -289,9 +300,20 @@ object `package` { | |
def runEnv[F[_]]: FetchRunnerEnv[F] = new FetchRunnerEnv[F] | ||
|
||
private[fetch] class FetchRunnerEnv[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(Env, A)] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
|
@@ -300,7 +322,7 @@ object `package` { | |
T: Timer[F] | ||
): F[(Env, A)] = for { | ||
env <- Ref.of[F, Env](FetchEnv()) | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, Some(env)) | ||
e <- env.get | ||
} yield (e, result) | ||
|
@@ -312,17 +334,28 @@ object `package` { | |
def runCache[F[_]]: FetchRunnerCache[F] = new FetchRunnerCache[F] | ||
|
||
private[fetch] class FetchRunnerCache[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(DataSourceCache[F], A)] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(DataSourceCache, A)] = for { | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
): F[(DataSourceCache[F], A)] = for { | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, None) | ||
c <- cache.get | ||
} yield (c, result) | ||
|
@@ -332,7 +365,7 @@ object `package` { | |
|
||
private def performRun[F[_], A]( | ||
fa: Fetch[F, A], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -360,7 +393,7 @@ object `package` { | |
|
||
private def fetchRound[F[_], A]( | ||
rs: RequestMap[F], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -387,7 +420,7 @@ object `package` { | |
|
||
private def runBlockedRequest[F[_], A]( | ||
blocked: BlockedRequest[F], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -405,7 +438,7 @@ object `package` { | |
private def runFetchOne[F[_]]( | ||
q: FetchOne[Any, Any], | ||
putResult: FetchStatus => F[Unit], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -450,7 +483,7 @@ object `package` { | |
private def runBatch[F[_]]( | ||
q: Batch[Any, Any], | ||
putResult: FetchStatus => F[Unit], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -493,7 +526,7 @@ object `package` { | |
endTime <- T.clock.monotonic(MILLISECONDS) | ||
resultMap = combineBatchResults(batchedRequest.results, cachedResults) | ||
|
||
updatedCache <- c.insertMany(batchedRequest.results, request.ds) | ||
updatedCache <- c.bulkInsert(batchedRequest.results.toList, request.ds) | ||
_ <- cache.set(updatedCache) | ||
|
||
result <- putResult(FetchDone[Map[Any, Any]](resultMap)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are no longer correct, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right!