Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ThisBuild / tlBaseVersion := "0.6"

val http4sVersion = "0.23.33"
val natchezVersion = "0.3.8"
val natchezVersion = "0.3.8-134-e5a1826-SNAPSHOT"
val scala212Version = "2.12.21"
val scala213Version = "2.13.18"
val scala3Version = "3.3.7"
Expand Down Expand Up @@ -78,6 +78,10 @@ lazy val http4s = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"org.http4s" %%% "http4s-client" % http4sVersion,
"org.http4s" %%% "http4s-server" % http4sVersion,
"org.tpolecat" %%% "natchez-testkit" % natchezVersion % Test,
"com.comcast" %%% "ip4s-test-kit" % "3.7.0" % Test,
"io.opentelemetry" % "opentelemetry-sdk-common" % "1.57.0" % Test,
"io.opentelemetry.semconv" % "opentelemetry-semconv" % "1.37.0" % Test,
"io.opentelemetry.semconv" % "opentelemetry-semconv-incubating" % "1.37.0-alpha" % Test,
)
)
.dependsOn(core)
Expand Down
150 changes: 120 additions & 30 deletions modules/http4s/src/main/scala/natchez/http4s/NatchezMiddleware.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@

package natchez.http4s

import cats.data.{Kleisli, OptionT}
import cats.syntax.all._
import cats.effect.{MonadCancel, MonadCancelThrow, Outcome, Resource}
import cats.effect.syntax.all._
import Outcome._
import natchez.{Span, Tags, Trace, TraceValue}
import cats.Applicative
import cats.data.{Ior, Kleisli, OptionT}
import cats.effect.Outcome.*
import cats.effect.syntax.all.*
import cats.effect.{MonadCancel, MonadCancelThrow, Resource}
import cats.syntax.all.*
import natchez.Span.Options.Defaults
import natchez.Span.SpanKind
import natchez.http4s.implicits.*
import natchez.*
import org.http4s.client.Client
import org.http4s.HttpRoutes
import org.http4s.{Request, Response}
import org.http4s.headers.Host
import org.http4s.{HttpRoutes, Request, Response, Uri, headers}

import java.io.ByteArrayOutputStream
import java.io.PrintStream
import java.io.{ByteArrayOutputStream, PrintStream}

object NatchezMiddleware {
import syntax.kernel._
import syntax.kernel.*

@deprecated("Use NatchezMiddleware.server(routes)", "0.0.3")
def apply[F[_]: Trace](routes: HttpRoutes[F])(
Expand All @@ -42,21 +43,35 @@ object NatchezMiddleware {
* - "error.stacktrace" -> Exception stack trace as a multi-line string
* - "cancelled" -> true // only present in case of cancellation
*/
def server[F[_]: Trace](routes: HttpRoutes[F])(
implicit ev: MonadCancel[F, Throwable]
): HttpRoutes[F] =
def server[F[_]: Trace : MonadCancelThrow](routes: HttpRoutes[F]): HttpRoutes[F] =
server(routes, useOpenTelemetrySemanticConventions = false)

def server[F[_]: Trace : MonadCancelThrow](routes: HttpRoutes[F],
useOpenTelemetrySemanticConventions: Boolean,
): HttpRoutes[F] =
Kleisli { req =>

val addRequestFields: F[Unit] =
Trace[F].put(
Tags.http.method(req.method.name),
Tags.http.url(req.uri.renderString),
)
if (useOpenTelemetrySemanticConventions)
Trace[F].put(
"http.request.method" -> req.method,
"url.full" -> req.uri,
)
else
Trace[F].put(
Tags.http.method(req.method.name),
Tags.http.url(req.uri.renderString),
)

def addResponseFields(res: Response[F]): F[Unit] =
Trace[F].put(
Tags.http.status_code(res.status.code.toString)
)
if (useOpenTelemetrySemanticConventions)
Trace[F].put(
"http.response.status_code" -> res.status.code,
)
else
Trace[F].put(
Tags.http.status_code(res.status.code.toString)
)

def addErrorFields(e: Throwable): F[Unit] =
Trace[F].put(
Expand Down Expand Up @@ -95,7 +110,12 @@ object NatchezMiddleware {
*
*/
def client[F[_] : Trace : MonadCancelThrow](client: Client[F]): Client[F] =
NatchezMiddleware.client(client, _ => Seq.empty[(String, TraceValue)].pure[F])
NatchezMiddleware.client(client, useOpenTelemetrySemanticConventions = false)

def client[F[_] : Trace : MonadCancelThrow](client: Client[F],
useOpenTelemetrySemanticConventions: Boolean,
): Client[F] =
NatchezMiddleware.client(client, _ => Seq.empty[(String, TraceValue)].pure[F], useOpenTelemetrySemanticConventions)

/**
* A middleware that adds the current span's kernel to outgoing requests, performs requests in
Expand All @@ -112,7 +132,13 @@ object NatchezMiddleware {
*/
def clientWithAttributes[F[_] : Trace : MonadCancelThrow](client: Client[F])
(additionalAttributes: (String, TraceValue)*): Client[F] =
NatchezMiddleware.client(client, (_: Request[F]) => additionalAttributes.pure[F])
NatchezMiddleware.clientWithAttributes(client, useOpenTelemetrySemanticConventions = false)(additionalAttributes *)

def clientWithAttributes[F[_] : Trace : MonadCancelThrow](client: Client[F],
useOpenTelemetrySemanticConventions: Boolean,
)
(additionalAttributes: (String, TraceValue)*): Client[F] =
NatchezMiddleware.client(client, (_: Request[F]) => additionalAttributes.pure[F], useOpenTelemetrySemanticConventions)

/**
* A middleware that adds the current span's kernel to outgoing requests, performs requests in
Expand All @@ -130,7 +156,14 @@ object NatchezMiddleware {
def clientWithAttributes[F[_] : Trace : MonadCancelThrow](client: Client[F],
spanOptions: Span.Options)
(additionalAttributes: (String, TraceValue)*): Client[F] =
NatchezMiddleware.client(client, spanOptions, (_: Request[F]) => additionalAttributes.pure[F])
NatchezMiddleware.clientWithAttributes(client, spanOptions, useOpenTelemetrySemanticConventions = false)(additionalAttributes *)

def clientWithAttributes[F[_] : Trace : MonadCancelThrow](client: Client[F],
spanOptions: Span.Options,
useOpenTelemetrySemanticConventions: Boolean,
)
(additionalAttributes: (String, TraceValue)*): Client[F] =
NatchezMiddleware.client(client, spanOptions, (_: Request[F]) => additionalAttributes.pure[F], useOpenTelemetrySemanticConventions)

/**
* A middleware that adds the current span's kernel to outgoing requests, performs requests in
Expand All @@ -148,7 +181,13 @@ object NatchezMiddleware {
def client[F[_] : Trace : MonadCancelThrow](client: Client[F],
additionalAttributesF: Request[F] => F[Seq[(String, TraceValue)]],
): Client[F] =
NatchezMiddleware.client(client, Defaults.withSpanKind(SpanKind.Client), additionalAttributesF)
NatchezMiddleware.client(client, additionalAttributesF, useOpenTelemetrySemanticConventions = false)

def client[F[_] : Trace : MonadCancelThrow](client: Client[F],
additionalAttributesF: Request[F] => F[Seq[(String, TraceValue)]],
useOpenTelemetrySemanticConventions: Boolean,
): Client[F] =
NatchezMiddleware.client(client, Defaults.withSpanKind(SpanKind.Client), additionalAttributesF, useOpenTelemetrySemanticConventions)

/**
* A middleware that adds the current span's kernel to outgoing requests, performs requests in
Expand All @@ -167,23 +206,74 @@ object NatchezMiddleware {
spanOptions: Span.Options,
additionalAttributesF: Request[F] => F[Seq[(String, TraceValue)]],
): Client[F] =
NatchezMiddleware.client(client, spanOptions, additionalAttributesF, useOpenTelemetrySemanticConventions = false)

def client[F[_] : Trace : MonadCancelThrow](client: Client[F],
spanOptions: Span.Options,
additionalAttributesF: Request[F] => F[Seq[(String, TraceValue)]],
useOpenTelemetrySemanticConventions: Boolean,
): Client[F] =
Client { req =>
Resource.applyFull {poll =>
Trace[F].span("http4s-client-request", spanOptions) {
Trace[F].span(spanName(req, useOpenTelemetrySemanticConventions), spanOptions) {
for {
knl <- Trace[F].kernel
_ <- Trace[F].put(
"client.http.uri" -> req.uri.toString(),
"client.http.method" -> req.method.toString
httpUrlKey(useOpenTelemetrySemanticConventions) -> req.uri,
httpMethodKey(useOpenTelemetrySemanticConventions) -> req.method,
)
_ <- addAttributeFromUriAuthorityOrHeader("server.address")(fromUri = _.host.some, fromHeader = _.host.some)(req)
_ <- addAttributeFromUriAuthorityOrHeader("server.port")(fromUri = _.port, fromHeader = _.port)(req)
_ <- req.uri.scheme.asAttribute[F]("url.scheme")
additionalAttributes <- additionalAttributesF(req)
_ <- Trace[F].put(additionalAttributes: _*)
_ <- Trace[F].put(additionalAttributes *)
reqʹ = req.withHeaders(knl.toHttp4sHeaders ++ req.headers) // prioritize request headers over kernel ones
rsrc <- poll(client.run(reqʹ).allocatedCase)
_ <- Trace[F].put("client.http.status_code" -> rsrc._1.status.code.toString())
_ <- Trace[F].put(httpStatusCodeKey(useOpenTelemetrySemanticConventions) -> rsrc._1.status.code)
} yield rsrc
}
}
}

private def addAttributeFromUriAuthorityOrHeader[F[_] : Applicative : Trace, A: TraceableValue, B: TraceableValue](key: String)
(fromUri: Uri.Authority => Option[A],
fromHeader: headers.Host => Option[B])
(req: Request[F]): F[Unit] =
Ior.fromOptions(req.headers.get[Host].flatMap(fromHeader), req.uri.authority.flatMap(fromUri))
.map(_.toEither)
.asAttribute(key)

private implicit class OptionTraceOps[A](val maybeA: Option[A]) extends AnyVal {
def asAttribute[F[_] : Applicative : Trace](key: String)(implicit T: TraceableValue[A]): F[Unit] =
maybeA.traverse_(a => Trace[F].put(key -> a))
}

/**
* See https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span and
* https://opentelemetry.io/docs/specs/semconv/registry/attributes/url/#url-full
*/
private def httpUrlKey(useOpenTelemetrySemanticConventions: Boolean): String =
if (useOpenTelemetrySemanticConventions) "url.full" else "client.http.uri"

/**
* See https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span and
* https://opentelemetry.io/docs/specs/semconv/registry/attributes/http/#http-request-method
*/
private def httpMethodKey(useOpenTelemetrySemanticConventions: Boolean): String =
if (useOpenTelemetrySemanticConventions) "http.request.method" else "client.http.method"

/**
* See https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span and
* https://opentelemetry.io/docs/specs/semconv/registry/attributes/http/#http-response-status-code
*/
private def httpStatusCodeKey(useOpenTelemetrySemanticConventions: Boolean): String =
if (useOpenTelemetrySemanticConventions) "http.response.status_code" else "client.http.status_code"

/**
* See https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name
*/
private def spanName[F[_]](request: Request[F],
useOpenTelemetrySemanticConventions: Boolean): String =
if (useOpenTelemetrySemanticConventions) request.method.name else "http4s-client-request"

}
11 changes: 9 additions & 2 deletions modules/http4s/src/main/scala/natchez/http4s/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@

package natchez

import org.http4s.{Method, Uri, headers}

package object http4s {
object implicits
extends syntax.ToEntryPointOps
with syntax.ToKernelOps
with syntax.ToKernelCompanionOps
with syntax.ToKernelCompanionOps {
implicit lazy val traceableValueUri: TraceableValue[Uri] = TraceableValue[String].contramap(_.renderString)
implicit lazy val traceableValueHost: TraceableValue[Uri.Host] = TraceableValue[String].contramap(_.renderString)
implicit lazy val traceableValueMethod: TraceableValue[Method] = TraceableValue[String].contramap(_.renderString)
implicit lazy val traceableValueScheme: TraceableValue[Uri.Scheme] = TraceableValue[String].contramap(_.value)
implicit lazy val traceableValueHostHeader: TraceableValue[headers.Host] = TraceableValue[String].contramap(_.host)
}
}

21 changes: 21 additions & 0 deletions modules/http4s/src/test/scala/natchez/http4s/InMemory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ package http4s

import cats.data.Kleisli
import cats.effect.{IO, MonadCancelThrow}
import com.comcast.ip4s.Arbitraries.*
import com.comcast.ip4s.{Hostname, Port}
import munit.CatsEffectSuite
import org.http4s.{Uri, headers}
import org.scalacheck.{Arbitrary, Gen}
import org.typelevel.ci.*

trait InMemorySuite
Expand Down Expand Up @@ -63,4 +67,21 @@ trait InMemorySuite
val CustomHeaderName = ci"X-Custom-Header"
val CorrelationIdName = ci"X-Correlation-Id"

implicit val arbUriAuthority: Arbitrary[Uri.Authority] = Arbitrary {
for {
host <- Arbitrary.arbitrary[Hostname]
port <- Arbitrary.arbitrary[Option[Port]]
} yield Uri.Authority(host = Uri.Host.fromIp4sHost(host), port = port.map(_.value))
}

implicit val arbHostHeader: Arbitrary[headers.Host] = Arbitrary {
for {
host <- Arbitrary.arbitrary[Hostname]
port <- Arbitrary.arbitrary[Option[Port]]
} yield headers.Host(host.toString, port.map(_.value))
}

implicit val arbUriScheme: Arbitrary[Uri.Scheme] = Arbitrary {
Gen.oneOf(Uri.Scheme.http, Uri.Scheme.https)
}
}
Loading
Loading