Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Par typeclass #166

Merged
merged 12 commits into from
Nov 12, 2018
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ Summary of changes:
* `Query` and `FetchMonadError` types deleted
* `Fetch#traverse`, `Fetch#sequence`, `Fetch#join` & `Fetch#collect` deleted in favor of usign cats typeclass ops
* Introduction of `cats-effect` for the implementation and target types
- `DataSource` in terms of `ConcurrentEffect` from `cats-effect` and `Par` from `cats-par`
- `DataSource` in terms of `ConcurrentEffect` from `cats-effect`
- `DataSourceCache` in terms of `ConcurrentEffect`
- `Fetch` is now parameterised to `F[_]` with a `ConcurrentEffect[F]` and `Par[F]`
- `Fetch` is now parameterised to `F[_]` with a `ConcurrentEffect[F]`
- `Fetch#apply` now doesn't require an implicit `DataSource` but it must be provided explicitly
- `Fetch#run` now requires a `Timer[F]` and `ContextShift[F]` from `cats-effect`
- Removed Monix, Future and Twitter Future subprojects, most of them should work with `cats-effect` abstractions already
Expand Down
53 changes: 26 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[comment]: # (Start Badges)

[![Join the chat at https://gitter.im/47deg/fetch](https://badges.gitter.im/47deg/fetch.svg)](https://gitter.im/47deg/fetch?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Build Status](https://travis-ci.org/47deg/fetch.svg?branch=master)](https://travis-ci.org/47deg/fetch) [![codecov.io](http://codecov.io/github/47deg/fetch/coverage.svg?branch=master)](http://codecov.io/github/47deg/fetch?branch=master) [![Maven Central](https://img.shields.io/badge/maven%20central-1.0.0-RC1-green.svg)](https://oss.sonatype.org/#nexus-search;gav~com.47deg~fetch*) [![License](https://img.shields.io/badge/license-Apache%202-blue.svg)](https://raw.githubusercontent.com/47deg/fetch/master/LICENSE) [![Latest version](https://img.shields.io/badge/fetch-1.0.0-RC1-green.svg)](https://index.scala-lang.org/47deg/fetch) [![Scala.js](http://scala-js.org/assets/badges/scalajs-0.6.17.svg)](http://scala-js.org) [![GitHub Issues](https://img.shields.io/github/issues/47deg/fetch.svg)](/~https://github.com/47deg/fetch/issues)
[![Join the chat at https://gitter.im/47deg/fetch](https://badges.gitter.im/47deg/fetch.svg)](https://gitter.im/47deg/fetch?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Build Status](https://travis-ci.org/47deg/fetch.svg?branch=master)](https://travis-ci.org/47deg/fetch) [![codecov.io](http://codecov.io/github/47deg/fetch/coverage.svg?branch=master)](http://codecov.io/github/47deg/fetch?branch=master) [![Maven Central](https://img.shields.io/badge/maven%20central-0.6.1-green.svg)](https://oss.sonatype.org/#nexus-search;gav~com.47deg~fetch*) [![License](https://img.shields.io/badge/license-Apache%202-blue.svg)](https://raw.githubusercontent.com/47deg/fetch/master/LICENSE) [![Latest version](https://img.shields.io/badge/fetch-0.6.1-green.svg)](https://index.scala-lang.org/47deg/fetch) [![Scala.js](http://scala-js.org/assets/badges/scalajs-0.6.15.svg)](http://scala-js.org) [![GitHub Issues](https://img.shields.io/github/issues/47deg/fetch.svg)](/~https://github.com/47deg/fetch/issues)

[comment]: # (End Badges)

Expand Down Expand Up @@ -52,12 +52,11 @@ Data Sources take two type parameters:
```scala
import cats.data.NonEmptyList
import cats.effect.ConcurrentEffect
import cats.temp.par.Par

trait DataSource[Identity, Result]{
def name: String
def fetch[F[_] : ConcurrentEffect : Par](id: Identity): F[Option[Result]]
def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
def fetch[F[_] : ConcurrentEffect](id: Identity): F[Option[Result]]
def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
}
```

Expand All @@ -68,7 +67,6 @@ We'll implement a dummy data source that can convert integers to strings. For co
```scala
import cats.data.NonEmptyList
import cats.effect._
import cats.temp.par._
import cats.instances.list._
import cats.syntax.all._

Expand All @@ -77,13 +75,13 @@ import fetch._
object ToStringSource extends DataSource[Int, String]{
override def name = "ToString"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[String]] = {
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One ToString $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One ToString $id")) >>
Sync[F].pure(Option(id.toString))
}

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Int]): F[Map[Int, String]] = {
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Int]): F[Map[Int, String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] Batch ToString $ids")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] Batch ToString $ids")) >>
Sync[F].pure(ids.toList.map(i => (i, i.toString)).toMap)
Expand Down Expand Up @@ -127,8 +125,8 @@ import scala.concurrent.duration._
// import scala.concurrent.duration._

Fetch.run[IO](fetchOne).unsafeRunTimed(5.seconds)
// --> [48] One ToString 1
// <-- [48] One ToString 1
// --> [179] One ToString 1
// <-- [179] One ToString 1
// res0: Option[String] = Some(1)
```

Expand All @@ -147,8 +145,8 @@ When executing the above fetch, note how the three identities get batched and th

```scala
Fetch.run[IO](fetchThree).unsafeRunTimed(5.seconds)
// --> [49] Batch ToString NonEmptyList(1, 2, 3)
// <-- [49] Batch ToString NonEmptyList(1, 2, 3)
// --> [179] Batch ToString NonEmptyList(1, 2, 3)
// <-- [179] Batch ToString NonEmptyList(1, 2, 3)
// res1: Option[(String, String, String)] = Some((1,2,3))
```

Expand All @@ -158,7 +156,7 @@ Note that the `DataSource#batch` method is not mandatory, it will be implemented
object UnbatchedToStringSource extends DataSource[Int, String]{
override def name = "UnbatchedToString"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[String]] = {
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One UnbatchedToString $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One UnbatchedToString $id")) >>
Sync[F].pure(Option(id.toString))
Expand All @@ -180,12 +178,12 @@ When executing the above fetch, note how the three identities get requested in p

```scala
Fetch.run[IO](fetchUnbatchedThree).unsafeRunTimed(5.seconds)
// --> [49] One UnbatchedToString 2
// --> [48] One UnbatchedToString 3
// --> [51] One UnbatchedToString 1
// <-- [48] One UnbatchedToString 3
// <-- [49] One UnbatchedToString 2
// <-- [51] One UnbatchedToString 1
// --> [179] One UnbatchedToString 1
// --> [181] One UnbatchedToString 3
// <-- [181] One UnbatchedToString 3
// --> [182] One UnbatchedToString 2
// <-- [182] One UnbatchedToString 2
// <-- [179] One UnbatchedToString 1
// res2: Option[(String, String, String)] = Some((1,2,3))
```

Expand All @@ -197,12 +195,12 @@ If we combine two independent fetches from different data sources, the fetches c
object LengthSource extends DataSource[String, Int]{
override def name = "Length"

override def fetch[F[_] : ConcurrentEffect : Par](id: String): F[Option[Int]] = {
override def fetch[F[_] : ConcurrentEffect](id: String): F[Option[Int]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One Length $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One Length $id")) >>
Sync[F].pure(Option(id.size))
}
override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[String]): F[Map[String, Int]] = {
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[String]): F[Map[String, Int]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] Batch Length $ids")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] Batch Length $ids")) >>
Sync[F].pure(ids.toList.map(i => (i, i.size)).toMap)
Expand All @@ -224,10 +222,10 @@ Note how the two independent data fetches run in parallel, minimizing the latenc

```scala
Fetch.run[IO](fetchMulti).unsafeRunTimed(5.seconds)
// --> [48] One ToString 1
// <-- [48] One ToString 1
// --> [50] One Length one
// <-- [50] One Length one
// --> [181] One ToString 1
// <-- [181] One ToString 1
// --> [180] One Length one
// <-- [180] One Length one
// res3: Option[(String, Int)] = Some((1,3))
```

Expand All @@ -248,8 +246,8 @@ While running it, notice that the data source is only queried once. The next tim

```scala
Fetch.run[IO](fetchTwice).unsafeRunTimed(5.seconds)
// --> [51] One ToString 1
// <-- [51] One ToString 1
// --> [182] One ToString 1
// <-- [182] One ToString 1
// res4: Option[(String, String)] = Some((1,1))
```

Expand All @@ -266,10 +264,11 @@ For more in-depth information take a look at our [documentation](http://47deg.gi
If you wish to add your library here please consider a PR to include it in the list below.

[comment]: # (Start Copyright)

# Copyright

Fetch is designed and developed by 47 Degrees

Copyright (C) 2016-2018 47 Degrees. <http://47deg.com>

[comment]: # (End Copyright)
[comment]: # (End Copyright)
42 changes: 20 additions & 22 deletions docs/src/main/tut/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,15 @@ In order to tell Fetch how to retrieve data, we must implement the `DataSource`

```scala
import cats.effect.ConcurrentEffect
import cats.temp.par._
import cats.data.NonEmptyList

trait DataSource[Identity, Result]{
def name: String

def fetch[F[_] : ConcurrentEffect : Par](id: Identity): F[Option[Result]]
def fetch[F[_] : ConcurrentEffect](id: Identity): F[Option[Result]]

/* `batch` is implemented in terms of `fetch` by default */
def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
}
```

Expand Down Expand Up @@ -121,7 +120,6 @@ def latency[F[_] : ConcurrentEffect, A](result: A, msg: String): F[A] = for {
And now we're ready to write our user data source; we'll emulate a database with an in-memory map.

```tut:silent
import cats.temp.par._
import cats.data.NonEmptyList
import cats.instances.list._
import fetch._
Expand All @@ -136,10 +134,10 @@ val userDatabase: Map[UserId, User] = Map(
object UserSource extends DataSource[UserId, User]{
override def name = "User"

override def fetch[F[_] : ConcurrentEffect : Par](id: UserId): F[Option[User]] =
override def fetch[F[_] : ConcurrentEffect](id: UserId): F[Option[User]] =
latency(userDatabase.get(id), s"One User $id")

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency(userDatabase.filterKeys(ids.toList.toSet), s"Batch Users $ids")
}
```
Expand Down Expand Up @@ -169,7 +167,7 @@ If your data source doesn't support batching, you can simply leave the `batch` m
implicit object UnbatchedSource extends DataSource[Int, Int]{
override def name = "Unbatched"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[Int]] =
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[Int]] =
Sync[F].pure(Option(id))
}
```
Expand All @@ -182,10 +180,10 @@ The default `batch` implementation run requests to the data source in parallel,
object UnbatchedSeqSource extends DataSource[Int, Int]{
override def name = "UnbatchedSeq"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[Int]] =
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[Int]] =
Sync[F].pure(Option(id))

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Int]): F[Map[Int, Int]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Int]): F[Map[Int, Int]] =
ids.traverse(
(id) => fetch(id).map(v => (id, v))
).map(_.collect { case (i, Some(x)) => (i, x) }.toMap)
Expand All @@ -201,10 +199,10 @@ If your data source only supports querying it in batches, you can implement `fet
object OnlyBatchedSource extends DataSource[Int, Int]{
override def name = "OnlyBatched"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[Int]] =
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[Int]] =
batch(NonEmptyList(id, List())).map(_.get(id))

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Int]): F[Map[Int, Int]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Int]): F[Map[Int, Int]] =
Sync[F].pure(ids.map(x => (x, x)).toList.toMap)
}
```
Expand Down Expand Up @@ -349,10 +347,10 @@ val postDatabase: Map[PostId, Post] = Map(
implicit object PostSource extends DataSource[PostId, Post]{
override def name = "Post"

override def fetch[F[_] : ConcurrentEffect : Par](id: PostId): F[Option[Post]] =
override def fetch[F[_] : ConcurrentEffect](id: PostId): F[Option[Post]] =
latency(postDatabase.get(id), s"One Post $id")

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[PostId]): F[Map[PostId, Post]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[PostId]): F[Map[PostId, Post]] =
latency(postDatabase.filterKeys(ids.toList.toSet), s"Batch Posts $ids")
}

Expand All @@ -372,12 +370,12 @@ We'll implement a data source for retrieving a post topic given a post id.
implicit object PostTopicSource extends DataSource[Post, PostTopic]{
override def name = "Post topic"

override def fetch[F[_] : ConcurrentEffect : Par](id: Post): F[Option[PostTopic]] = {
override def fetch[F[_] : ConcurrentEffect](id: Post): F[Option[PostTopic]] = {
val topic = if (id.id % 2 == 0) "monad" else "applicative"
latency(Option(topic), s"One Post Topic $id")
}

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Post]): F[Map[Post, PostTopic]] = {
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Post]): F[Map[Post, PostTopic]] = {
val result = ids.toList.map(id => (id, if (id.id % 2 == 0) "monad" else "applicative")).toMap
latency(result, s"Batch Post Topics $ids")
}
Expand Down Expand Up @@ -481,7 +479,7 @@ We'll be using the default in-memory cache, prepopulated with some data. The cac
is calculated with the `DataSource`'s `name` method and the request identity.

```tut:silent
def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Int, User](
def cache[F[_] : ConcurrentEffect] = InMemoryCache.from[F, Int, User](
(UserSource.name, 1) -> User(1, "@dialelo")
)
```
Expand Down Expand Up @@ -545,13 +543,13 @@ case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] {
Applicative[F].pure(None)
}

def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]()
def forgetfulCache[F[_] : ConcurrentEffect] = ForgetfulCache[F]()
```

We can now use our implementation of the cache when running a fetch.

```tut:book
def fetchSameTwice[F[_] : ConcurrentEffect : Par]: Fetch[F, (User, User)] = for {
def fetchSameTwice[F[_] : ConcurrentEffect]: Fetch[F, (User, User)] = for {
one <- getUser(1)
another <- getUser(1)
} yield (one, another)
Expand All @@ -575,10 +573,10 @@ implicit object BatchedUserSource extends DataSource[UserId, User]{

override def maxBatchSize: Option[Int] = Some(2)

override def fetch[F[_] : ConcurrentEffect : Par](id: UserId): F[Option[User]] =
override def fetch[F[_] : ConcurrentEffect](id: UserId): F[Option[User]] =
latency(userDatabase.get(id), s"One User $id")

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency(userDatabase.filterKeys(ids.toList.toSet), s"Batch Users $ids")
}

Expand Down Expand Up @@ -608,10 +606,10 @@ implicit object SequentialUserSource extends DataSource[UserId, User]{

override def batchExecution: BatchExecution = Sequentially // defaults to `InParallel`

override def fetch[F[_] : ConcurrentEffect : Par](id: UserId): F[Option[User]] =
override def fetch[F[_] : ConcurrentEffect](id: UserId): F[Option[User]] =
latency(userDatabase.get(id), s"One User $id")

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency(userDatabase.filterKeys(ids.toList.toSet), s"Batch Users $ids")
}

Expand Down
16 changes: 7 additions & 9 deletions docs/src/main/tut/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ Data Sources take two type parameters:
```scala
import cats.data.NonEmptyList
import cats.effect.ConcurrentEffect
import cats.temp.par.Par

trait DataSource[Identity, Result]{
def name: String
def fetch[F[_] : ConcurrentEffect : Par](id: Identity): F[Option[Result]]
def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
def fetch[F[_] : ConcurrentEffect](id: Identity): F[Option[Result]]
def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Identity]): F[Map[Identity, Result]]
}
```

Expand All @@ -71,7 +70,6 @@ We'll implement a dummy data source that can convert integers to strings. For co
```tut:silent
import cats.data.NonEmptyList
import cats.effect._
import cats.temp.par._
import cats.instances.list._
import cats.syntax.all._

Expand All @@ -80,13 +78,13 @@ import fetch._
object ToStringSource extends DataSource[Int, String]{
override def name = "ToString"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[String]] = {
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One ToString $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One ToString $id")) >>
Sync[F].pure(Option(id.toString))
}

override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[Int]): F[Map[Int, String]] = {
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[Int]): F[Map[Int, String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] Batch ToString $ids")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] Batch ToString $ids")) >>
Sync[F].pure(ids.toList.map(i => (i, i.toString)).toMap)
Expand Down Expand Up @@ -154,7 +152,7 @@ Note that the `DataSource#batch` method is not mandatory, it will be implemented
object UnbatchedToStringSource extends DataSource[Int, String]{
override def name = "UnbatchedToString"

override def fetch[F[_] : ConcurrentEffect : Par](id: Int): F[Option[String]] = {
override def fetch[F[_] : ConcurrentEffect](id: Int): F[Option[String]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One UnbatchedToString $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One UnbatchedToString $id")) >>
Sync[F].pure(Option(id.toString))
Expand Down Expand Up @@ -186,12 +184,12 @@ If we combine two independent fetches from different data sources, the fetches c
object LengthSource extends DataSource[String, Int]{
override def name = "Length"

override def fetch[F[_] : ConcurrentEffect : Par](id: String): F[Option[Int]] = {
override def fetch[F[_] : ConcurrentEffect](id: String): F[Option[Int]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] One Length $id")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] One Length $id")) >>
Sync[F].pure(Option(id.size))
}
override def batch[F[_] : ConcurrentEffect : Par](ids: NonEmptyList[String]): F[Map[String, Int]] = {
override def batch[F[_] : ConcurrentEffect](ids: NonEmptyList[String]): F[Map[String, Int]] = {
Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] Batch Length $ids")) >>
Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] Batch Length $ids")) >>
Sync[F].pure(ids.toList.map(i => (i, i.size)).toMap)
Expand Down
Loading