From c93b4241f1e86d890182aef25522e2f0a18b687a Mon Sep 17 00:00:00 2001 From: Ihor Vovk Date: Sun, 10 Nov 2024 21:17:12 +0100 Subject: [PATCH] Add client for http4s library (#3118) * Add client for http4s library * add import * Rename authorization => authentication * Support ApiKey auth scheme * Remove default authentication * Add some tests * Remove authentication from the client after the generic auth method was merged * Use latest http4s version * Address PR's comments * delete flaky test * Revert "delete flaky test" This reverts commit a7f87624b7c49eb35886c63d33364e93cdba2144. --------- Co-authored-by: Philippus Baalman --- build.sbt | 7 +++ .../http4s/Elastic4sEntityEncoders.scala | 40 ++++++++++++++ .../elastic4s/http4s/Http4sClient.scala | 55 +++++++++++++++++++ .../http4s/RequestResponseConverters.scala | 52 ++++++++++++++++++ .../http4s/Http4sRequestHttpClientTest.scala | 35 ++++++++++++ project/Dependencies.scala | 4 ++ 6 files changed, 193 insertions(+) create mode 100644 elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Elastic4sEntityEncoders.scala create mode 100644 elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Http4sClient.scala create mode 100644 elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/RequestResponseConverters.scala create mode 100644 elastic4s-client-http4s/src/test/scala/com/sksamuel/elastic4s/http4s/Http4sRequestHttpClientTest.scala diff --git a/build.sbt b/build.sbt index b7dd0aee8..68a8bb15b 100644 --- a/build.sbt +++ b/build.sbt @@ -101,6 +101,7 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq( clientesjava, clientsSniffed, clientpekko, + clienthttp4s, cats_effect, cats_effect_2, zio_1, @@ -316,6 +317,12 @@ lazy val clientpekko = (project in file("elastic4s-client-pekko")) .settings(scala3Settings) .settings(libraryDependencies ++= Seq(pekkoHTTP, pekkoStream)) +lazy val clienthttp4s = (project in file("elastic4s-client-http4s")) + .dependsOn(core, testkit % "test") + .settings(name := "elastic4s-client-http4s") + .settings(scala3Settings) + .settings(libraryDependencies ++= Seq(http4sClient, http4sEmberClient % Test)) + lazy val tests = (project in file("elastic4s-tests")) .settings(name := "elastic4s-tests") diff --git a/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Elastic4sEntityEncoders.scala b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Elastic4sEntityEncoders.scala new file mode 100644 index 000000000..1b7631b34 --- /dev/null +++ b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Elastic4sEntityEncoders.scala @@ -0,0 +1,40 @@ +package com.sksamuel.elastic4s.http4s + +import cats.effect.Sync +import com.sksamuel.elastic4s +import fs2.io.file.{Files, Path} +import org.http4s + +import java.io.InputStream +import scala.language.higherKinds + +trait Elastic4sEntityEncoders { + + implicit def elasticEntityEncoder[F[_] : Sync : Files]: http4s.EntityEncoder[F, elastic4s.HttpEntity] = + new http4s.EntityEncoder[F, elastic4s.HttpEntity] { + override def toEntity(a: elastic4s.HttpEntity): http4s.Entity[F] = { + a match { + case elastic4s.HttpEntity.StringEntity(str, _) => + http4s.EntityEncoder.stringEncoder[F].toEntity(str) + case elastic4s.HttpEntity.InputStreamEntity(is, _) => + http4s.EntityEncoder.inputStreamEncoder[F, InputStream].toEntity(Sync[F].pure(is)) + case elastic4s.HttpEntity.FileEntity(file, _) => + http4s.EntityEncoder.pathEncoder[F].toEntity(Path.fromNioPath(file.toPath)) + case elastic4s.HttpEntity.ByteArrayEntity(arr, _) => + http4s.EntityEncoder.byteArrayEncoder[F].toEntity(arr) + } + } + + override def headers: http4s.Headers = http4s.Headers.empty + } + + implicit def optionalEntityEncoder[F[_], A](implicit ee: http4s.EntityEncoder[F, A]): http4s.EntityEncoder[F, Option[A]] = + new http4s.EntityEncoder[F, Option[A]] { + override def toEntity(a: Option[A]): http4s.Entity[F] = { + a.fold[http4s.Entity[F]](http4s.Entity.empty)(ee.toEntity) + } + + override def headers: http4s.Headers = http4s.Headers.empty + } + +} diff --git a/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Http4sClient.scala b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Http4sClient.scala new file mode 100644 index 000000000..185936f2f --- /dev/null +++ b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Http4sClient.scala @@ -0,0 +1,55 @@ +package com.sksamuel.elastic4s.http4s + +import cats.effect.unsafe.IORuntime +import cats.effect.{Async, IO} +import com.sksamuel.elastic4s +import com.sksamuel.elastic4s.ElasticNodeEndpoint +import fs2.io.file.Files +import org.http4s + +import scala.language.higherKinds + +// class to support callback interface +trait CallbackRunner[F[_]] { + def run[A](fa: F[A], cb: Either[Throwable, A] => Unit): Unit +} + +object Http4sClient { + + def usingIO( + client: http4s.client.Client[IO], + endpoint: ElasticNodeEndpoint, + )(implicit runtime: IORuntime): Http4sClient[IO] = { + val ioRunner = new CallbackRunner[IO] { + override def run[A](fa: IO[A], cb: Either[Throwable, A] => Unit): Unit = fa.unsafeRunAsync(cb) + } + + new Http4sClient( + client = client, + endpoint = endpoint, + runner = ioRunner + ) + } + +} + +class Http4sClient[F[_] : Async : Files]( + client: http4s.client.Client[F], + endpoint: ElasticNodeEndpoint, + runner: CallbackRunner[F], +) extends elastic4s.HttpClient with RequestResponseConverters { + + override def send( + request: elastic4s.ElasticRequest, + callback: Either[Throwable, elastic4s.HttpResponse] => Unit + ): Unit = { + val http4sRequest = elasticRequestToHttp4sRequest[F](endpoint, request) + + val response = client.run(http4sRequest).use(http4sResponseToElasticResponse[F]) + + runner.run(response, callback) + } + + // Instantiation of the http4s client happens by the Resource monad, so closing should be managed by it as well + override def close(): Unit = () +} diff --git a/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/RequestResponseConverters.scala b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/RequestResponseConverters.scala new file mode 100644 index 000000000..eaf56b1b3 --- /dev/null +++ b/elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/RequestResponseConverters.scala @@ -0,0 +1,52 @@ +package com.sksamuel.elastic4s.http4s + +import cats.effect.Async +import cats.syntax.all._ +import com.sksamuel.elastic4s +import com.sksamuel.elastic4s.ElasticNodeEndpoint +import fs2.io.file.Files +import org.http4s + +import scala.language.higherKinds + +trait RequestResponseConverters extends Elastic4sEntityEncoders { + + def elasticRequestToHttp4sRequest[F[_] : Async : Files]( + endpoint: ElasticNodeEndpoint, + request: elastic4s.ElasticRequest, + ): http4s.Request[F] = { + val uri = http4s.Uri( + scheme = http4s.Uri.Scheme.fromString(endpoint.protocol).toOption, + authority = http4s.Uri.Authority( + host = http4s.Uri.RegName(endpoint.host), + port = endpoint.port.some + ).some, + path = http4s.Uri.Path(request.endpoint.stripPrefix("/").split('/').map(http4s.Uri.Path.Segment(_)).toVector), + query = http4s.Query.fromPairs(request.params.toList: _*) + ) + + http4s.Request[F]() + .withUri(uri) + .withMethod(http4s.Method.fromString(request.method).valueOr(throw _)) + .withHeaders(http4s.Headers(request.headers.toList).put()) + .withEntity(request.entity) + .withContentTypeOption( + request.entity.flatMap(_.contentCharset).map(http4s.headers.`Content-Type`.parse(_).valueOr(throw _)) + ) + } + + def http4sResponseToElasticResponse[F[_] : Async]( + response: http4s.Response[F] + ): F[elastic4s.HttpResponse] = { + for { + body <- response.body + .through(fs2.text.utf8.decode) + .compile.string + } yield elastic4s.HttpResponse( + statusCode = response.status.code, + entity = Some(elastic4s.HttpEntity.StringEntity(body, None)), + headers = response.headers.headers.map { h => h.name.toString -> h.value }.toMap, + ) + } + +} diff --git a/elastic4s-client-http4s/src/test/scala/com/sksamuel/elastic4s/http4s/Http4sRequestHttpClientTest.scala b/elastic4s-client-http4s/src/test/scala/com/sksamuel/elastic4s/http4s/Http4sRequestHttpClientTest.scala new file mode 100644 index 000000000..676edab74 --- /dev/null +++ b/elastic4s-client-http4s/src/test/scala/com/sksamuel/elastic4s/http4s/Http4sRequestHttpClientTest.scala @@ -0,0 +1,35 @@ +package com.sksamuel.elastic4s.http4s + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import com.sksamuel.elastic4s.testkit.DockerTests +import com.sksamuel.elastic4s.{Authentication, CommonRequestOptions, ElasticClient, ElasticNodeEndpoint} +import org.http4s.ember.client.EmberClientBuilder +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class Http4sRequestHttpClientTest extends AnyFlatSpec with Matchers with DockerTests { + private val http4s = EmberClientBuilder.default[IO].build.allocated.unsafeRunSync()._1 + private val http4sClient = Http4sClient.usingIO( + http4s, + ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None), + ) + override val client: ElasticClient = ElasticClient(http4sClient) + + "Http4sRequestHttpClient" should "be able to call elasticsearch" in { + client.execute { + catHealth() + }.await.result.status shouldBe "green" + } + + it should "be able to propagate headers if included" in { + implicit val options: CommonRequestOptions = CommonRequestOptions.defaults.copy( + authentication = Authentication.UsernamePassword("user123", "pass123") + ) + + client.execute { + catHealth() + }.await.result.status shouldBe "401" + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1e75847f5..700efaf7e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,6 +9,8 @@ object Dependencies { val CirceVersion = "0.14.10" val CommonsIoVersion = "2.17.0" val ElasticsearchVersion = "8.15.3" + val ExtsVersion = "1.61.1" + val Http4sVersion = "0.23.29" val JacksonVersion = "2.18.1" val Json4sVersion = "4.0.7" val Log4jVersion = "2.24.1" @@ -62,6 +64,8 @@ object Dependencies { lazy val cats = "org.typelevel" %% "cats-effect" % CatsEffectVersion lazy val cats2 = "org.typelevel" %% "cats-effect" % CatsEffect2Version lazy val elasticsearchRestClient = "org.elasticsearch.client" % "elasticsearch-rest-client" % ElasticsearchVersion + lazy val http4sClient = "org.http4s" %% "http4s-client" % Http4sVersion + lazy val http4sEmberClient = "org.http4s" %% "http4s-ember-client" % Http4sVersion lazy val json4s = Seq("org.json4s" %% "json4s-core" % Json4sVersion, "org.json4s" %% "json4s-jackson" % Json4sVersion) lazy val monix = "io.monix" %% "monix" % MonixVersion lazy val pekkoActor = "org.apache.pekko" %% "pekko-actor" % PekkoVersion