-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataSourceCache parameterised to F[_] #160
Changes from 10 commits
3f4d3df
022098d
15bf018
79a933a
62e30a2
9e08d60
a5cf2c5
455e005
11de8d3
060c3d5
3c19563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,17 +268,28 @@ object `package` { | |
def run[F[_]]: FetchRunner[F] = new FetchRunner[F] | ||
|
||
private[fetch] class FetchRunner[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be this replaced by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm afraid is not. The effect type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need |
||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[A] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[A] = for { | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, None) | ||
} yield result | ||
} | ||
|
@@ -289,9 +300,20 @@ object `package` { | |
def runEnv[F[_]]: FetchRunnerEnv[F] = new FetchRunnerEnv[F] | ||
|
||
private[fetch] class FetchRunnerEnv[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(Env, A)] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
|
@@ -300,7 +322,7 @@ object `package` { | |
T: Timer[F] | ||
): F[(Env, A)] = for { | ||
env <- Ref.of[F, Env](FetchEnv()) | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, Some(env)) | ||
e <- env.get | ||
} yield (e, result) | ||
|
@@ -312,17 +334,28 @@ object `package` { | |
def runCache[F[_]]: FetchRunnerCache[F] = new FetchRunnerCache[F] | ||
|
||
private[fetch] class FetchRunnerCache[F[_]](private val dummy: Boolean = true) extends AnyVal { | ||
def apply[A]( | ||
fa: Fetch[F, A] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(DataSourceCache[F], A)] = | ||
apply(fa, InMemoryCache.empty[F]) | ||
|
||
def apply[A]( | ||
fa: Fetch[F, A], | ||
cache: DataSourceCache = InMemoryCache.empty | ||
cache: DataSourceCache[F] | ||
)( | ||
implicit | ||
P: Par[F], | ||
C: ConcurrentEffect[F], | ||
CS: ContextShift[F], | ||
T: Timer[F] | ||
): F[(DataSourceCache, A)] = for { | ||
cache <- Ref.of[F, DataSourceCache](cache) | ||
): F[(DataSourceCache[F], A)] = for { | ||
cache <- Ref.of[F, DataSourceCache[F]](cache) | ||
result <- performRun(fa, cache, None) | ||
c <- cache.get | ||
} yield (c, result) | ||
|
@@ -332,7 +365,7 @@ object `package` { | |
|
||
private def performRun[F[_], A]( | ||
fa: Fetch[F, A], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -360,7 +393,7 @@ object `package` { | |
|
||
private def fetchRound[F[_], A]( | ||
rs: RequestMap[F], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -387,7 +420,7 @@ object `package` { | |
|
||
private def runBlockedRequest[F[_], A]( | ||
blocked: BlockedRequest[F], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -405,7 +438,7 @@ object `package` { | |
private def runFetchOne[F[_]]( | ||
q: FetchOne[Any, Any], | ||
putResult: FetchStatus => F[Unit], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -450,7 +483,7 @@ object `package` { | |
private def runBatch[F[_]]( | ||
q: Batch[Any, Any], | ||
putResult: FetchStatus => F[Unit], | ||
cache: Ref[F, DataSourceCache], | ||
cache: Ref[F, DataSourceCache[F]], | ||
env: Option[Ref[F, Env]] | ||
)( | ||
implicit | ||
|
@@ -493,7 +526,7 @@ object `package` { | |
endTime <- T.clock.monotonic(MILLISECONDS) | ||
resultMap = combineBatchResults(batchedRequest.results, cachedResults) | ||
|
||
updatedCache <- c.insertMany(batchedRequest.results, request.ds) | ||
updatedCache <- c.bulkInsert(batchedRequest.results.toList, request.ds) | ||
_ <- cache.set(updatedCache) | ||
|
||
result <- putResult(FetchDone[Map[Any, Any]](resultMap)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are no longer correct, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right!