Skip to content

Commit

Permalink
Merge pull request #6 from davenverse/catsEffect3
Browse files Browse the repository at this point in the history
Cats-Effect 3 Upgrade
  • Loading branch information
ChristopherDavenport authored Aug 13, 2021
2 parents ba15dea + 7e28cc3 commit 697c33b
Show file tree
Hide file tree
Showing 18 changed files with 133 additions and 237 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.13.4]
scala: [2.12.14, 2.13.4]
java: [adopt@1.8, adopt@1.11]
runs-on: ${{ matrix.os }}
steps:
Expand Down
32 changes: 15 additions & 17 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
val catsV = "2.3.0"
val catsEffectV = "2.3.1"
val shapelessV = "2.3.3"
val fs2V = "2.4.6"
val http4sV = "0.21.15"
val circeV = "0.13.0"
val doobieV = "0.9.4"
val log4catsV = "1.1.1"
val mUnitV = "0.7.20"
val catsV = "2.6.1"
val catsEffectV = "3.2.2"
val fs2V = "3.1.0"
val circeV = "0.14.1"

ThisBuild / testFrameworks += new TestFramework("munit.Framework")

ThisBuild / crossScalaVersions := Seq("2.13.4")
ThisBuild / crossScalaVersions := Seq("2.12.14", "2.13.4")

// Projects
lazy val `rediculous-concurrent` = project.in(file("."))
Expand All @@ -31,20 +26,23 @@ lazy val core = project.in(file("core"))
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,

"io.chrisdavenport" %% "rediculous" % "0.0.11",
"io.chrisdavenport" %% "mapref" % "0.1.1",
"io.chrisdavenport" %% "circuit" % "0.4.3",
"io.chrisdavenport" %% "mules" % "0.4.0",
"io.chrisdavenport" %% "rediculous" % "0.1.0",
"io.chrisdavenport" %% "mapref" % "0.2.0-M2",
"io.chrisdavenport" %% "circuit" % "0.5.0-M1",
"io.chrisdavenport" %% "mules" % "0.5.0-M1",

// Deps we may use in the future, but don't need presently.
// "io.circe" %% "circe-generic" % circeV,
// "io.chrisdavenport" %% "log4cats-core" % log4catsV,
// "io.chrisdavenport" %% "log4cats-slf4j" % log4catsV,
// "io.chrisdavenport" %% "log4cats-testing" % log4catsV % Test,
"org.scalameta" %% "munit" % mUnitV % Test,
"org.typelevel" %% "munit-cats-effect-2" % "0.12.0" % Test,
"org.typelevel" %% "munit-cats-effect-3" % "1.0.5" % Test,
"com.dimafeng" %% "testcontainers-scala" % "0.38.8" % Test
)
),
scalacOptions ++= {
if (scalaVersion.value.startsWith("2.12")) Seq("-Ypartial-unification")
else Seq()
}
)

lazy val examples = project.in(file("examples"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ object RedisCache {

}

def instance[F[_]: Concurrent](
def instance[F[_]: Async](
connection: RedisConnection[F],
namespace: String,
setOpts: RedisCommands.SetOpts
): Cache[F, String, String] = new RedisCacheBase[F](connection, namespace, setOpts)

private class RedisCacheBase[F[_]: Concurrent](
private class RedisCacheBase[F[_]: Async](
connection: RedisConnection[F],
namespace: String,
setOpts: RedisCommands.SetOpts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ import io.circe.syntax._
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
import cats.effect._
import cats.effect.concurrent.Ref
import io.chrisdavenport.rediculous.{RedisCommands,RedisConnection}

object RedisCircuit {

def circuitAtLocation[F[_]: Concurrent: Timer](
def circuitAtLocation[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
acquireTimeout: FiniteDuration = 5.seconds,
Expand All @@ -37,7 +36,7 @@ object RedisCircuit {
CircuitBreaker.unsafe(ref, maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout, Applicative[F].unit, Applicative[F].unit, Applicative[F].unit, Applicative[F].unit)
}

def keyCircuit[F[_]: Concurrent: Timer](
def keyCircuit[F[_]: Async](
redisConnection: RedisConnection[F],
acquireTimeout: FiniteDuration = 5.seconds,
lockDuration: FiniteDuration = 10.seconds,
Expand All @@ -47,7 +46,7 @@ object RedisCircuit {
exponentialBackoffFactor: Double,
maxResetTimeout: Duration
): String => CircuitBreaker[F] = {
val base: RedisMapRef[F] = RedisMapRef.impl[F](redisConnection, acquireTimeout, lockDuration, setOpts)(Concurrent[F], Timer[F])
val base: RedisMapRef[F] = RedisMapRef.impl[F](redisConnection, acquireTimeout, lockDuration, setOpts)
val closed: String = (CircuitBreaker.Closed(0): State).asJson.noSpaces

{key: String =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.chrisdavenport.rediculous.concurrent
import cats._
import cats.syntax.all._
import cats.effect._
import cats.effect.concurrent._
import io.chrisdavenport.rediculous._
import io.circe._
import io.circe.syntax._
Expand Down Expand Up @@ -37,7 +36,7 @@ abstract class CountDownLatch[F[_]] { self =>

object RedisCountdownLatch {

def createOrAccess[F[_]: Concurrent: Timer](
def createOrAccess[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
latches: Int,
Expand All @@ -56,7 +55,7 @@ object RedisCountdownLatch {
.as(new ConcurrentCountDownLatch[F](ref, pollingInterval, redisConnection, key, deferredLifetime))
}

def accessAtKey[F[_]: Concurrent: Timer](
def accessAtKey[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
acquireTimeout: FiniteDuration,
Expand All @@ -76,7 +75,7 @@ object RedisCountdownLatch {
)
}

private class ConcurrentCountDownLatch[F[_]: Concurrent: Timer](
private class ConcurrentCountDownLatch[F[_]: Async](
state: Ref[F, State],
pollingInterval: FiniteDuration,
redisConnection: RedisConnection[F],
Expand All @@ -86,7 +85,7 @@ object RedisCountdownLatch {
extends CountDownLatch[F] {

override def release: F[Unit] =
Concurrent[F].uncancelable {
Concurrent[F].uncancelable {_ =>
state.modify {
case Awaiting(n, signal) =>
if (n > 1) (Awaiting(n - 1, signal), Applicative[F].unit) else (Done(), RedisDeferred.fromKey(redisConnection, signal, pollingInterval, lifetime).complete(keyLocation).void)
Expand All @@ -102,35 +101,35 @@ object RedisCountdownLatch {

}

private class PossiblyAbsentCountdownLatch[F[_]: Concurrent: Timer](
private class PossiblyAbsentCountdownLatch[F[_]: Async](
state: Ref[F, Option[State]],
pollingInterval: FiniteDuration,
redisConnection: RedisConnection[F],
keyLocation: String,
lifetime: FiniteDuration
) extends CountDownLatch[F] {
override def release: F[Unit] =
Concurrent[F].uncancelable {
Concurrent[F].uncancelable {_ =>
state.modify {
case Some(Awaiting(n, signal)) =>
if (n > 1) (Awaiting(n - 1, signal).some, false.pure[F]) else (Done().some, RedisDeferred.fromKey(redisConnection, signal, pollingInterval, lifetime).complete(keyLocation).void.as(false))
case Some(d @ Done()) => (d.some, false.pure[F])
case None => (None, true.pure[F])
}.flatten
}.ifM(
Timer[F].sleep(pollingInterval) >> release,
Temporal[F].sleep(pollingInterval) >> release,
Applicative[F].unit
)

override def await: F[Unit] =
state.get.flatMap {
case Some(Awaiting(_, signal)) => RedisDeferred.fromKey(redisConnection, signal, pollingInterval, lifetime).get.void
case Some(Done()) => Applicative[F].unit
case None => Timer[F].sleep(pollingInterval) >> await
case None => Temporal[F].sleep(pollingInterval) >> await
}
}

def stateAtLocation[F[_]: Concurrent: Timer](
def stateAtLocation[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
acquireTimeout: FiniteDuration,
Expand All @@ -151,31 +150,15 @@ object RedisCountdownLatch {
default: A
): Deferred[F, Unit] = new TranslatedDeferred[F, A](tryAble, default)

def liftTryableDeferred[F[_]: Functor, A](
tryAble: TryableDeferred[F, A],
default: A
): TryableDeferred[F, Unit] = new TranslatedTryDeferred[F, A](tryAble, default)

class TranslatedTryDeferred[F[_]: Functor, A](
val tryAble: TryableDeferred[F, A],
val default: A
) extends TryableDeferred[F, Unit]{
def complete(a: Unit): F[Unit] =
tryAble.complete(default)
def get: F[Unit] =
tryAble.get.void
def tryGet: F[Option[Unit]] =
tryAble.tryGet.map(_.void)
}

class TranslatedDeferred[F[_]: Functor, A](
val tryAble: Deferred[F, A],
val default: A
) extends Deferred[F, Unit]{
def complete(a: Unit): F[Unit] =
def complete(a: Unit): F[Boolean] =
tryAble.complete(default)
def get: F[Unit] =
tryAble.get.void
def tryGet: F[Option[Unit]] = tryAble.tryGet.map(_.void)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.chrisdavenport.rediculous.concurrent
import cats._
import cats.syntax.all._
import cats.effect._
import cats.effect.concurrent._
import cats.effect.syntax.all._
import io.circe._
import io.circe.syntax._
Expand Down Expand Up @@ -36,7 +35,7 @@ trait CyclicBarrier[F[_]]{ self =>

object RedisCyclicBarrier {

def create[F[_]: Concurrent: Timer](
def create[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
capacity: Int,
Expand All @@ -52,7 +51,7 @@ object RedisCyclicBarrier {
case class State(awaiting: Int, epoch: Long, currentDeferredLocation: String)


private class RedisCyclicBarrier[F[_]: Concurrent : Timer](
private class RedisCyclicBarrier[F[_]: Async](
redisConnection: RedisConnection[F],
key: String,
capacity: Int,
Expand Down Expand Up @@ -91,7 +90,7 @@ object RedisCyclicBarrier {
}

Some(newState) -> deferredAtLocation(location).get.void.guaranteeCase{
case ExitCase.Canceled => cleanup
case Outcome.Canceled() => cleanup
case _ => Applicative[F].unit
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.chrisdavenport.rediculous.concurrent
import io.chrisdavenport.rediculous._
import cats.effect._
import cats.data.NonEmptyList
import cats.effect.concurrent._
import cats.syntax.all._
import scala.concurrent.duration.FiniteDuration
import io.chrisdavenport.rediculous.RedisCommands.Condition
Expand All @@ -17,54 +16,53 @@ object RedisDeferred {
/**
* Creates A Unique Deferred, returning the key and a deferred instance that can be used.
*/
def create[F[_]: Concurrent: Timer](
def create[F[_]: Async](
redisConnection: RedisConnection[F],
pollingInterval: FiniteDuration,
lifetime: FiniteDuration
): F[(String, TryableDeferred[F, String])] = Sync[F].delay(java.util.UUID.randomUUID()).map{identifier =>
): F[(String, Deferred[F, String])] = Sync[F].delay(java.util.UUID.randomUUID()).map{identifier =>
val key = s"deferred:${identifier}"
(key, fromKey(redisConnection, key, pollingInterval, lifetime))
}

def fromKey[F[_]: Concurrent: Timer](
def fromKey[F[_]: Async](
redisConnection: RedisConnection[F],
keyLocation: String,
pollingInterval: FiniteDuration,
lifetime: FiniteDuration
): TryableDeferred[F, String] = new LocationDeferredRef[F](redisConnection, keyLocation, pollingInterval, lifetime)
): Deferred[F, String] = new LocationDeferredRef[F](redisConnection, keyLocation, pollingInterval, lifetime)

class LocationDeferredRef[F[_]: Concurrent: Timer](
class LocationDeferredRef[F[_]: Async](
redisConnection: RedisConnection[F],
keyLocation: String,
pollingInterval: FiniteDuration,
lifetime: FiniteDuration
) extends TryableDeferred[F, String] {
) extends Deferred[F, String] {

def tryGet: F[Option[String]] =
RedisCommands.get(keyLocation).run(redisConnection)

def get: F[String] =
RedisCommands.get(keyLocation).run(redisConnection).flatMap{
case None => Timer[F].sleep(pollingInterval) >> get
case None => Temporal[F].sleep(pollingInterval) >> get
case Some(a) => a.pure[F]
}

def complete(a: String): F[Unit] = {
def complete(a: String): F[Boolean] = {
// val ctxState = RedisResult.option[RedisProtocol.Status](RedisResult.status)
RedisCtx[Redis[F, *]].keyed[Option[RedisProtocol.Status]](keyLocation, NonEmptyList.of("SET", keyLocation, a, "PX", lifetime.toMillis.toString(), "NX"))
// RedisCommands.set(keyLocation, a, RedisCommands.SetOpts(None, Some(lifetime.toMillis), Some(Condition.Nx), false))
.run(redisConnection)
.flatMap{
case None => Timer[F].sleep(pollingInterval) >> complete(a)
case Some(Ok) => Applicative[F].unit
case Some(Pong) => Concurrent[F].raiseError[Unit](
case None => Temporal[F].sleep(pollingInterval) >> complete(a)
case Some(Ok) => Applicative[F].pure(true)
case Some(Pong) => Concurrent[F].raiseError[Boolean](
new IllegalStateException("Attempting to complete a Deferred got Pong: should never arrive here")
)
case Some(Status(getStatus)) => Concurrent[F].raiseError[Unit](
new IllegalStateException("Attempting to complete a Deferred that has already been completed")
)
case Some(Status(getStatus)) => Concurrent[F].pure(false)
}
}



}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.UUID

object RedisLock {

def tryAcquireLock[F[_]: Concurrent: Timer](
def tryAcquireLock[F[_]: Async](
connection: RedisConnection[F],
lockname: String,
acquireTimeout: FiniteDuration,
Expand All @@ -35,10 +35,10 @@ object RedisLock {
case None => None
}
} yield out
Concurrent.timeout(create, acquireTimeout)
Concurrent[F].timeout(create, acquireTimeout)
}

def shutdownLock[F[_]: Concurrent](
def shutdownLock[F[_]: Async](
connection: RedisConnection[F],
lockname: String,
identifier: UUID
Expand All @@ -60,7 +60,7 @@ object RedisLock {
}
}

def tryAcquireLockWithTimeout[F[_]: Concurrent: Timer](
def tryAcquireLockWithTimeout[F[_]: Async](
connection: RedisConnection[F],
lockname: String,
acquireTimeout: FiniteDuration,
Expand Down Expand Up @@ -99,11 +99,11 @@ object RedisLock {
case None => Applicative[F].unit
}

val createWithTimeout = Concurrent.timeout(create, acquireTimeout)
val createWithTimeout = Concurrent[F].timeout(create, acquireTimeout)
Resource.make(createWithTimeout)(shutdown).map(_.isDefined)
}

def acquireLockWithTimeout[F[_]: Concurrent: Timer](
def acquireLockWithTimeout[F[_]: Async](
connection: RedisConnection[F],
lockname: String,
acquireTimeout: FiniteDuration,
Expand Down
Loading

0 comments on commit 697c33b

Please sign in to comment.