-
Notifications
You must be signed in to change notification settings - Fork 694
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add client for http4s library (#3118)
* Add client for http4s library * add import * Rename authorization => authentication * Support ApiKey auth scheme * Remove default authentication * Add some tests * Remove authentication from the client after the generic auth method was merged * Use latest http4s version * Address PR's comments * delete flaky test * Revert "delete flaky test" This reverts commit a7f8762. --------- Co-authored-by: Philippus Baalman <philippus@gmail.com>
- Loading branch information
Showing
6 changed files
with
193 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
40 changes: 40 additions & 0 deletions
40
...-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Elastic4sEntityEncoders.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package com.sksamuel.elastic4s.http4s | ||
|
||
import cats.effect.Sync | ||
import com.sksamuel.elastic4s | ||
import fs2.io.file.{Files, Path} | ||
import org.http4s | ||
|
||
import java.io.InputStream | ||
import scala.language.higherKinds | ||
|
||
trait Elastic4sEntityEncoders { | ||
|
||
implicit def elasticEntityEncoder[F[_] : Sync : Files]: http4s.EntityEncoder[F, elastic4s.HttpEntity] = | ||
new http4s.EntityEncoder[F, elastic4s.HttpEntity] { | ||
override def toEntity(a: elastic4s.HttpEntity): http4s.Entity[F] = { | ||
a match { | ||
case elastic4s.HttpEntity.StringEntity(str, _) => | ||
http4s.EntityEncoder.stringEncoder[F].toEntity(str) | ||
case elastic4s.HttpEntity.InputStreamEntity(is, _) => | ||
http4s.EntityEncoder.inputStreamEncoder[F, InputStream].toEntity(Sync[F].pure(is)) | ||
case elastic4s.HttpEntity.FileEntity(file, _) => | ||
http4s.EntityEncoder.pathEncoder[F].toEntity(Path.fromNioPath(file.toPath)) | ||
case elastic4s.HttpEntity.ByteArrayEntity(arr, _) => | ||
http4s.EntityEncoder.byteArrayEncoder[F].toEntity(arr) | ||
} | ||
} | ||
|
||
override def headers: http4s.Headers = http4s.Headers.empty | ||
} | ||
|
||
implicit def optionalEntityEncoder[F[_], A](implicit ee: http4s.EntityEncoder[F, A]): http4s.EntityEncoder[F, Option[A]] = | ||
new http4s.EntityEncoder[F, Option[A]] { | ||
override def toEntity(a: Option[A]): http4s.Entity[F] = { | ||
a.fold[http4s.Entity[F]](http4s.Entity.empty)(ee.toEntity) | ||
} | ||
|
||
override def headers: http4s.Headers = http4s.Headers.empty | ||
} | ||
|
||
} |
55 changes: 55 additions & 0 deletions
55
elastic4s-client-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/Http4sClient.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.sksamuel.elastic4s.http4s | ||
|
||
import cats.effect.unsafe.IORuntime | ||
import cats.effect.{Async, IO} | ||
import com.sksamuel.elastic4s | ||
import com.sksamuel.elastic4s.ElasticNodeEndpoint | ||
import fs2.io.file.Files | ||
import org.http4s | ||
|
||
import scala.language.higherKinds | ||
|
||
// class to support callback interface | ||
trait CallbackRunner[F[_]] { | ||
def run[A](fa: F[A], cb: Either[Throwable, A] => Unit): Unit | ||
} | ||
|
||
object Http4sClient { | ||
|
||
def usingIO( | ||
client: http4s.client.Client[IO], | ||
endpoint: ElasticNodeEndpoint, | ||
)(implicit runtime: IORuntime): Http4sClient[IO] = { | ||
val ioRunner = new CallbackRunner[IO] { | ||
override def run[A](fa: IO[A], cb: Either[Throwable, A] => Unit): Unit = fa.unsafeRunAsync(cb) | ||
} | ||
|
||
new Http4sClient( | ||
client = client, | ||
endpoint = endpoint, | ||
runner = ioRunner | ||
) | ||
} | ||
|
||
} | ||
|
||
class Http4sClient[F[_] : Async : Files]( | ||
client: http4s.client.Client[F], | ||
endpoint: ElasticNodeEndpoint, | ||
runner: CallbackRunner[F], | ||
) extends elastic4s.HttpClient with RequestResponseConverters { | ||
|
||
override def send( | ||
request: elastic4s.ElasticRequest, | ||
callback: Either[Throwable, elastic4s.HttpResponse] => Unit | ||
): Unit = { | ||
val http4sRequest = elasticRequestToHttp4sRequest[F](endpoint, request) | ||
|
||
val response = client.run(http4sRequest).use(http4sResponseToElasticResponse[F]) | ||
|
||
runner.run(response, callback) | ||
} | ||
|
||
// Instantiation of the http4s client happens by the Resource monad, so closing should be managed by it as well | ||
override def close(): Unit = () | ||
} |
52 changes: 52 additions & 0 deletions
52
...lient-http4s/src/main/scala/com/sksamuel/elastic4s/http4s/RequestResponseConverters.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package com.sksamuel.elastic4s.http4s | ||
|
||
import cats.effect.Async | ||
import cats.syntax.all._ | ||
import com.sksamuel.elastic4s | ||
import com.sksamuel.elastic4s.ElasticNodeEndpoint | ||
import fs2.io.file.Files | ||
import org.http4s | ||
|
||
import scala.language.higherKinds | ||
|
||
trait RequestResponseConverters extends Elastic4sEntityEncoders { | ||
|
||
def elasticRequestToHttp4sRequest[F[_] : Async : Files]( | ||
endpoint: ElasticNodeEndpoint, | ||
request: elastic4s.ElasticRequest, | ||
): http4s.Request[F] = { | ||
val uri = http4s.Uri( | ||
scheme = http4s.Uri.Scheme.fromString(endpoint.protocol).toOption, | ||
authority = http4s.Uri.Authority( | ||
host = http4s.Uri.RegName(endpoint.host), | ||
port = endpoint.port.some | ||
).some, | ||
path = http4s.Uri.Path(request.endpoint.stripPrefix("/").split('/').map(http4s.Uri.Path.Segment(_)).toVector), | ||
query = http4s.Query.fromPairs(request.params.toList: _*) | ||
) | ||
|
||
http4s.Request[F]() | ||
.withUri(uri) | ||
.withMethod(http4s.Method.fromString(request.method).valueOr(throw _)) | ||
.withHeaders(http4s.Headers(request.headers.toList).put()) | ||
.withEntity(request.entity) | ||
.withContentTypeOption( | ||
request.entity.flatMap(_.contentCharset).map(http4s.headers.`Content-Type`.parse(_).valueOr(throw _)) | ||
) | ||
} | ||
|
||
def http4sResponseToElasticResponse[F[_] : Async]( | ||
response: http4s.Response[F] | ||
): F[elastic4s.HttpResponse] = { | ||
for { | ||
body <- response.body | ||
.through(fs2.text.utf8.decode) | ||
.compile.string | ||
} yield elastic4s.HttpResponse( | ||
statusCode = response.status.code, | ||
entity = Some(elastic4s.HttpEntity.StringEntity(body, None)), | ||
headers = response.headers.headers.map { h => h.name.toString -> h.value }.toMap, | ||
) | ||
} | ||
|
||
} |
35 changes: 35 additions & 0 deletions
35
...ent-http4s/src/test/scala/com/sksamuel/elastic4s/http4s/Http4sRequestHttpClientTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package com.sksamuel.elastic4s.http4s | ||
|
||
import cats.effect.IO | ||
import cats.effect.unsafe.implicits.global | ||
import com.sksamuel.elastic4s.testkit.DockerTests | ||
import com.sksamuel.elastic4s.{Authentication, CommonRequestOptions, ElasticClient, ElasticNodeEndpoint} | ||
import org.http4s.ember.client.EmberClientBuilder | ||
import org.scalatest.flatspec.AnyFlatSpec | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
class Http4sRequestHttpClientTest extends AnyFlatSpec with Matchers with DockerTests { | ||
private val http4s = EmberClientBuilder.default[IO].build.allocated.unsafeRunSync()._1 | ||
private val http4sClient = Http4sClient.usingIO( | ||
http4s, | ||
ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None), | ||
) | ||
override val client: ElasticClient = ElasticClient(http4sClient) | ||
|
||
"Http4sRequestHttpClient" should "be able to call elasticsearch" in { | ||
client.execute { | ||
catHealth() | ||
}.await.result.status shouldBe "green" | ||
} | ||
|
||
it should "be able to propagate headers if included" in { | ||
implicit val options: CommonRequestOptions = CommonRequestOptions.defaults.copy( | ||
authentication = Authentication.UsernamePassword("user123", "pass123") | ||
) | ||
|
||
client.execute { | ||
catHealth() | ||
}.await.result.status shouldBe "401" | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters