diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 05b6f8cc..2301f779 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,11 +87,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p plugin/target runtime/target/jvm-2.12 codegen/target/jvm-3 codegen/target/jvm-2.13 runtime/target/jvm-3 codegen/target/jvm-2.12 runtime/target/jvm-2.13 project/target + run: mkdir -p plugin/target otel4s-trace/target/jvm-3 runtime/target/jvm-2.12 codegen/target/jvm-3 codegen/target/jvm-2.13 runtime/target/jvm-3 codegen/target/jvm-2.12 runtime/target/jvm-2.13 otel4s-trace/target/jvm-2.13 project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar plugin/target runtime/target/jvm-2.12 codegen/target/jvm-3 codegen/target/jvm-2.13 runtime/target/jvm-3 codegen/target/jvm-2.12 runtime/target/jvm-2.13 project/target + run: tar cf targets.tar plugin/target otel4s-trace/target/jvm-3 runtime/target/jvm-2.12 codegen/target/jvm-3 codegen/target/jvm-2.13 runtime/target/jvm-3 codegen/target/jvm-2.12 runtime/target/jvm-2.13 otel4s-trace/target/jvm-2.13 project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') @@ -224,7 +224,7 @@ jobs: - name: Submit Dependencies uses: scalacenter/sbt-dependency-submission@v2 with: - modules-ignore: root_3 e2e_2.12 e2e_3 e2e_2.13 + modules-ignore: root_3 e2e_2.12 e2eotel4s_3 e2eotel4s_2.13 e2e_3 e2e_2.13 configs-ignore: test scala-tool scala-doc-tool test-internal validate-steward: diff --git a/README.md b/README.md index c91e6752..844bf445 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,51 @@ The full set of options available are: PB.protocOptions in Compile := Seq("-xyz") ``` +## OpenTelemetry tracing with otel4s + +The `fs2-grpc-otel4s-trace` module provides client and service aspects for tracing with [otel4s](https://typelevel.org/otel4s/). +The default configuration follows the OpenTelemetry gRPC semantic conventions for span kind, span name, +`rpc.system.name`, `rpc.method`, and `rpc.response.status_code`. + +```scala +import cats.effect.IO +import fs2.grpc.otel4s.trace.{TraceClientAspect, TraceServiceAspect} +import io.grpc.Metadata +import org.typelevel.otel4s.trace.TracerProvider + +def clientAspect(implicit tracerProvider: TracerProvider[IO]) = + TraceClientAspect.create[IO] + +def serviceAspect(implicit tracerProvider: TracerProvider[IO]) = + TraceServiceAspect.create[IO] +``` + +Use `withServerAddress` when the logical server address is known. The aspect cannot infer it from the generated call +context: + +```scala +val clientConfig = + TraceClientAspect.Config.default + .withServerAddress("grpc.example.com", Some(443)) + +val serviceConfig = + TraceServiceAspect.Config.default + .withServerAddress("grpc.example.com", Some(443)) +``` + +Other defaults can be customized through `Config`, for example span names, metadata propagation, attributes, and +finalization. + +```scala +val clientConfig = + TraceClientAspect.Config.default + .withTracerName("my-client") + .withSpanName((_, ctx) => ctx.methodDescriptor.getFullMethodName) +``` + +See the [OpenTelemetry gRPC semantic conventions](https://opentelemetry.io/docs/specs/semconv/rpc/grpc/) for the +attribute definitions. + ### Tool Sponsorship Development of fs2-grpc is generously supported in part by [YourKit](https://www.yourkit.com) through the use of their excellent Java profiler. diff --git a/build.sbt b/build.sbt index a7667cdd..d14b88af 100644 --- a/build.sbt +++ b/build.sbt @@ -53,7 +53,8 @@ inThisBuild( ) lazy val projects = - runtime.projectRefs ++ codegen.projectRefs ++ e2e.projectRefs ++ List(plugin.project, protocGen.agg.project) + runtime.projectRefs ++ otel4sTrace.projectRefs ++ codegen.projectRefs ++ e2e.projectRefs ++ e2eOtel4s.projectRefs ++ + List(plugin.project, protocGen.agg.project) lazy val root = (project in file(".")) .enablePlugins(BuildInfoPlugin, NoPublishPlugin) @@ -112,6 +113,7 @@ lazy val plugin = project lazy val runtime = (projectMatrix in file("runtime")) .defaultAxes(axesDefault: _*) + .enablePlugins(BuildInfoPlugin) .settings( name := "fs2-grpc-runtime", tlVersionIntroduced := Map("2.12" -> "2.5.3", "2.13" -> "2.5.3", "3" -> "2.5.3"), @@ -124,10 +126,32 @@ lazy val runtime = (projectMatrix in file("runtime")) scalacOptions ++= { if (tlIsScala3.value) { Seq("-language:implicitConversions", "-Ykind-projector", "-source:3.0-migration") } else Seq.empty - } + }, + buildInfoPackage := "fs2.grpc", + buildInfoKeys := Seq[BuildInfoKey](version), + buildInfoOptions += BuildInfoOption.PackagePrivate ) .jvmPlatform(scalaVersions = Seq(Scala212, Scala213, Scala3)) +lazy val otel4sTrace = (projectMatrix in file("otel4s-trace")) + .dependsOn(runtime) + .defaultAxes(axesDefault: _*) + .settings( + name := "fs2-grpc-otel4s-trace", + tlVersionIntroduced := Map("2.13" -> "3.1.0", "3" -> "3.1.0"), + libraryDependencies ++= List(otel4sCoreTrace, otel4sSemconv) ++ List(grpcNetty, ceTestkit, ceMunit).map(_ % Test), + Test / parallelExecution := false, + scalacOptions := { + if (tlIsScala3.value) { scalacOptions.value.filterNot(_ == "-Ykind-projector:underscores") } + else scalacOptions.value + }, + scalacOptions ++= { + if (tlIsScala3.value) { Seq("-language:implicitConversions", "-Ykind-projector", "-source:3.0-migration") } + else Seq.empty + } + ) + .jvmPlatform(scalaVersions = Seq(Scala213, Scala3)) + lazy val codeGenJVM212 = codegen.jvm(Scala212) lazy val protocGen = protocGenProject("protoc-gen-fs2-grpc", codeGenJVM212) .settings( @@ -194,3 +218,36 @@ lazy val e2e = (projectMatrix in file("e2e")) } ) .jvmPlatform(scalaVersions = Seq(Scala212, Scala213, Scala3)) + +lazy val e2eOtel4s = (projectMatrix in file("e2e-otel4s")) + .dependsOn(runtime, otel4sTrace) + .defaultAxes(axesDefault: _*) + .enablePlugins(LocalCodeGenPlugin, NoPublishPlugin) + .settings( + codeGenClasspath := (codeGenJVM212 / Compile / fullClasspath).value, + libraryDependencies := Nil, + libraryDependencies ++= List( + scalaPbGrpcRuntime, + scalaPbRuntime, + scalaPbRuntime % "protobuf", + ceMunit % Test, + otel4sOtelJavaTestkit % Test, + otel4sSemconvExperimental % Test, + "io.grpc" % "grpc-inprocess" % versions.grpc % Test + ), + Compile / PB.targets := Seq( + scalapb.gen() -> (Compile / sourceManaged).value / "scalapb", + genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "fs2-grpc" + ), + githubWorkflowArtifactUpload := false, + scalacOptions := { + if (tlIsScala3.value) { + scalacOptions.value.filterNot(o => o == "-Ykind-projector:underscores" || o == "-Wvalue-discard") + } else scalacOptions.value + }, + scalacOptions ++= { + if (tlIsScala3.value) { Seq("-language:implicitConversions", "-Ykind-projector", "-source:3.0-migration") } + else Seq.empty + } + ) + .jvmPlatform(scalaVersions = Seq(Scala213, Scala3)) diff --git a/e2e-otel4s/src/main/protobuf/test_service.proto b/e2e-otel4s/src/main/protobuf/test_service.proto new file mode 100644 index 00000000..da92ff9a --- /dev/null +++ b/e2e-otel4s/src/main/protobuf/test_service.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package hello.world; + +// TestService: Example gRPC service used in e2e tests +// It demonstrates all four RPC shapes. +service TestService { + // Unary RPC: no streaming in either direction + rpc noStreaming (TestRequest) returns (TestResponse); + // Client streaming RPC: client streams, server returns a single response + rpc clientStreaming (stream TestRequest) returns (TestResponse); + // Server streaming RPC: client sends one request, server streams responses + rpc serverStreaming (TestRequest) returns (stream TestResponse); + // Bidirectional streaming RPC: both client and server stream + rpc bothStreaming (stream TestRequest) returns (stream TestResponse); +} + +message TestRequest {} +message TestResponse {} \ No newline at end of file diff --git a/e2e-otel4s/src/test/scala/fs2/grpc/e2e/otel4s/TraceAspectSuite.scala b/e2e-otel4s/src/test/scala/fs2/grpc/e2e/otel4s/TraceAspectSuite.scala new file mode 100644 index 00000000..a955d830 --- /dev/null +++ b/e2e-otel4s/src/test/scala/fs2/grpc/e2e/otel4s/TraceAspectSuite.scala @@ -0,0 +1,509 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.e2e.otel4s + +import cats.effect.std.Dispatcher +import cats.effect.{Async, IO, Resource} +import fs2.Stream +import fs2.grpc.client.ClientOptions +import fs2.grpc.otel4s.trace.{TraceClientAspect, TraceServiceAspect} +import fs2.grpc.server.ServerOptions +import fs2.grpc.syntax.all._ +import hello.world.test_service.{ + TestRequest, + TestResponse, + TestServiceFs2Grpc, + TestServiceFs2GrpcTrailers, + TestServiceGrpc +} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import io.grpc.{Channel, Metadata, Server, ServerServiceDefinition, Status, StatusRuntimeException} +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import munit.{CatsEffectSuite, Location, TestOptions} +import org.typelevel.otel4s.oteljava.testkit.OtelJavaTestkit +import org.typelevel.otel4s.oteljava.testkit.trace.{ + SpanContextExpectation, + SpanExpectation, + TraceExpectation, + TraceExpectations, + TraceForestExpectation +} +import org.typelevel.otel4s.semconv.experimental.attributes.RpcExperimentalAttributes +import org.typelevel.otel4s.trace.{SpanContext, SpanKind, Tracer} +import org.typelevel.otel4s.{Attribute, Attributes} + +class TraceAspectSuite extends CatsEffectSuite { + + withFixture("follow request's span") { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + response <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> fixture.client.noStreaming(TestRequest(), new Metadata()) + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces( + expectedSuccessfulTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + "internal-handler:noStreaming" + ) + ) + } yield { + // server middleware shouldn't inject tracing info into response metadata + assertEquals(response._2.keys().size(), 0) + } + } + + withFixture("propagate the client span for server streaming calls") { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + _ <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> + fixture.client.serverStreaming(TestRequest(), new Metadata()).compile.drain + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces( + expectedSuccessfulTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_SERVER_STREAMING.getFullMethodName, + "internal-handler:serverStreaming" + ) + ) + } yield () + } + + withFixture("propagate the client span for bidirectional streaming calls") { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + _ <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> + fixture.client.bothStreaming(Stream.emit(TestRequest()), new Metadata()).compile.drain + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces( + expectedSuccessfulTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_BOTH_STREAMING.getFullMethodName, + "internal-handler:bothStreaming" + ) + ) + } yield () + } + + withFixture( + "reflect config changes", + TraceClientAspect.Config.default + .withTracerName("client-tracer") + .withAttributes((a, _) => Attributes(Attribute("client-operation", a.toString))), + TraceServiceAspect.Config.default + .withTracerName("service-tracer") + .withAttributes((a, _) => Attributes(Attribute("service-operation", a.toString))) + ) { fixture => + def expectedTraces(traceId: String): TraceForestExpectation = + TraceForestExpectation.ordered( + TraceExpectation.ordered( + spanExpectation("root", Attributes.empty, traceId, SpanKind.Internal), + TraceExpectation.ordered( + spanExpectation( + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + Attributes( + Attribute("client-operation", "UnaryToUnaryCallTrailers"), + RpcExperimentalAttributes.RpcResponseStatusCode("OK") + ), + traceId, + SpanKind.Client + ), + TraceExpectation.ordered( + spanExpectation( + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + Attributes( + Attribute("service-operation", "UnaryToUnaryCall"), + RpcExperimentalAttributes.RpcResponseStatusCode("OK") + ), + traceId, + SpanKind.Server + ), + TraceExpectation.leaf( + spanExpectation( + "internal-handler:noStreaming", + Attributes.empty, + traceId, + SpanKind.Internal + ) + ) + ) + ) + ) + ) + + for { + rootSpanContext <- IO.deferred[SpanContext] + response <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> fixture.client.noStreaming(TestRequest(), new Metadata()) + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces(expectedTraces(traceContext.traceIdHex)) + } yield { + // server middleware shouldn't inject tracing info into response metadata + assertEquals(response._2.keys().size(), 0) + } + } + + withFixture( + "include configured server address attributes", + TraceClientAspect.Config.default.withServerAddress("client.example", Some(443)), + TraceServiceAspect.Config.default.withServerAddress("service.example", Some(50051)) + ) { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + response <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> fixture.client.noStreaming(TestRequest(), new Metadata()) + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces( + expectedSuccessfulTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + "internal-handler:noStreaming", + clientAttributes = Attributes(Attribute("server.address", "client.example"), Attribute("server.port", 443L)), + serviceAttributes = + Attributes(Attribute("server.address", "service.example"), Attribute("server.port", 50051L)) + ) + ) + } yield { + assertEquals(response._2.keys().size(), 0) + } + } + + withFixture( + "include configured server address without port", + TraceClientAspect.Config.default.withServerAddress("/run/client.sock", None), + TraceServiceAspect.Config.default.withServerAddress("/run/service.sock", None) + ) { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + response <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> fixture.client.noStreaming(TestRequest(), new Metadata()) + } + + traceContext <- rootSpanContext.get + + _ <- fixture.assertTraces( + expectedSuccessfulTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + "internal-handler:noStreaming", + clientAttributes = Attributes(Attribute("server.address", "/run/client.sock")), + serviceAttributes = Attributes(Attribute("server.address", "/run/service.sock")) + ) + ) + } yield { + assertEquals(response._2.keys().size(), 0) + } + } + + withFixture( + "mark server error status codes as errors", + serviceBehavior = ServiceBehavior.failNoStreaming(Status.INTERNAL) + ) { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + result <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> + fixture.client.noStreaming(TestRequest(), new Metadata()).attempt + } + + traceContext <- rootSpanContext.get + + _ <- IO(assertEquals(result.left.map(_.getClass), Left(classOf[StatusRuntimeException]))) + _ <- fixture.assertTraces( + expectedFailedTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + Status.Code.INTERNAL.name(), + clientError = true, + serverError = true + ) + ) + } yield () + } + + withFixture( + "do not mark non-server error status codes as server span errors", + serviceBehavior = ServiceBehavior.failNoStreaming(Status.INVALID_ARGUMENT) + ) { fixture => + for { + rootSpanContext <- IO.deferred[SpanContext] + result <- fixture.tracer.span("root").use { span => + rootSpanContext.complete(span.context) *> + fixture.client.noStreaming(TestRequest(), new Metadata()).attempt + } + + traceContext <- rootSpanContext.get + + _ <- IO(assertEquals(result.left.map(_.getClass), Left(classOf[StatusRuntimeException]))) + _ <- fixture.assertTraces( + expectedFailedTrace( + traceContext.traceIdHex, + TestServiceGrpc.METHOD_NO_STREAMING.getFullMethodName, + Status.Code.INVALID_ARGUMENT.name(), + clientError = true, + serverError = false + ) + ) + } yield () + } + + private def spanExpectation( + name: String, + attributes: Attributes, + traceId: String, + kind: SpanKind + ): SpanExpectation = + SpanExpectation + .name(name) + .kind(kind) + .attributesExact(attributes) + .spanContext(SpanContextExpectation.any.traceIdHex(traceId)) + + private def expectedSuccessfulTrace( + traceId: String, + methodName: String, + handlerSpanName: String, + clientAttributes: Attributes = Attributes.empty, + serviceAttributes: Attributes = Attributes.empty + ): TraceForestExpectation = { + val attributes = Attributes( + RpcExperimentalAttributes.RpcSystemName(RpcExperimentalAttributes.RpcSystemValue.Grpc.value), + RpcExperimentalAttributes.RpcMethod(methodName), + RpcExperimentalAttributes.RpcResponseStatusCode("OK") + ) + + TraceForestExpectation.ordered( + TraceExpectation.ordered( + spanExpectation("root", Attributes.empty, traceId, SpanKind.Internal), + TraceExpectation.ordered( + spanExpectation( + methodName, + attributes ++ clientAttributes, + traceId, + SpanKind.Client + ), + TraceExpectation.ordered( + spanExpectation( + methodName, + attributes ++ serviceAttributes, + traceId, + SpanKind.Server + ), + TraceExpectation.leaf( + spanExpectation( + handlerSpanName, + Attributes.empty, + traceId, + SpanKind.Internal + ) + ) + ) + ) + ) + ) + } + + private def expectedFailedTrace( + traceId: String, + methodName: String, + code: String, + clientError: Boolean, + serverError: Boolean + ): TraceForestExpectation = { + val baseAttributes = Attributes( + RpcExperimentalAttributes.RpcSystemName(RpcExperimentalAttributes.RpcSystemValue.Grpc.value), + RpcExperimentalAttributes.RpcMethod(methodName), + RpcExperimentalAttributes.RpcResponseStatusCode(code) + ) + val clientAttributes = + if (clientError) baseAttributes ++ Attributes(Attribute("error.type", code)) else baseAttributes + val serverAttributes = + if (serverError) baseAttributes ++ Attributes(Attribute("error.type", code)) else baseAttributes + + TraceForestExpectation.ordered( + TraceExpectation.ordered( + spanExpectation("root", Attributes.empty, traceId, SpanKind.Internal), + TraceExpectation.ordered( + spanExpectation( + methodName, + clientAttributes, + traceId, + SpanKind.Client + ), + TraceExpectation.leaf( + spanExpectation( + methodName, + serverAttributes, + traceId, + SpanKind.Server + ) + ) + ) + ) + ) + } + + private def withFixture[A]( + opts: TestOptions, + clientConfig: TraceClientAspect.Config = TraceClientAspect.Config.default, + serviceConfig: TraceServiceAspect.Config = TraceServiceAspect.Config.default, + serviceBehavior: ServiceBehavior = ServiceBehavior.default + )(f: Fix => IO[A])(implicit loc: Location): Unit = + test(opts) { + mkFixture(clientConfig, serviceConfig, serviceBehavior).use(f) + } + + private def mkFixture( + clientConfig: TraceClientAspect.Config, + serviceConfig: TraceServiceAspect.Config, + serviceBehavior: ServiceBehavior + ): Resource[IO, Fix] = + for { + testkit <- OtelJavaTestkit.builder[IO].addTextMapPropagators(W3CTraceContextPropagator.getInstance()).build + + dispatcher <- Dispatcher.parallel[IO] + + tracerProvider = testkit.tracerProvider + + serviceAspect <- TraceServiceAspect.create[IO](serviceConfig)(Async[IO], tracerProvider).toResource + clientAspect <- TraceClientAspect.create[IO](clientConfig)(Async[IO], tracerProvider).toResource + + tracer <- testkit.tracerProvider.get("service").toResource + + serviceDefinition = TestServiceFs2Grpc.serviceFull( + dispatcher, + new TestService(serviceBehavior)(tracer), + serviceAspect, + ServerOptions.default + ) + + id <- IO.randomUUID.map(_.toString).toResource + + _ <- startServices(id)(serviceDefinition) + + channel <- bindClientChannel(id) + + client = TestServiceFs2GrpcTrailers.mkClientFull( + dispatcher, + channel, + clientAspect, + ClientOptions.default + ) + } yield new Fix(client, testkit, tracer) + + private final class Fix( + val client: TestServiceFs2GrpcTrailers[IO, Metadata], + val testkit: OtelJavaTestkit[IO], + val tracer: Tracer[IO] + ) { + def assertTraces(expectation: TraceForestExpectation)(implicit loc: Location): IO[Unit] = + testkit.finishedSpans.flatMap { spans => + TraceExpectations.check(spans, expectation) match { + case Right(_) => + IO.unit + case Left(mismatches) => + IO(fail(TraceExpectations.format(mismatches))) + } + } + + } + + sealed trait Op + object Op { + case object NoStreaming extends Op + case object ClientStreaming extends Op + case object ServerStreaming extends Op + case object BothStreaming extends Op + } + + case class ServerEvent(op: Op, ctx: Metadata) + + private case class ServiceBehavior(noStreamingFailure: Option[Status]) + + private object ServiceBehavior { + val default: ServiceBehavior = + ServiceBehavior(None) + + def failNoStreaming(status: Status): ServiceBehavior = + ServiceBehavior(Some(status)) + } + + private class TestService(behavior: ServiceBehavior)(implicit T: Tracer[IO]) + extends TestServiceFs2Grpc[IO, Metadata] { + def noStreaming(request: TestRequest, ctx: Metadata): IO[TestResponse] = + behavior.noStreamingFailure match { + case Some(status) => + IO.raiseError(status.asRuntimeException()) + case None => + T.span("internal-handler:noStreaming").surround { + IO.pure(TestResponse()) + } + } + + def clientStreaming(request: Stream[IO, TestRequest], ctx: Metadata): IO[TestResponse] = + T.span("internal-handler:clientStreaming").surround { + IO.pure(TestResponse()) + } + + def serverStreaming(request: TestRequest, ctx: Metadata): Stream[IO, TestResponse] = + Stream.eval { + T.span("internal-handler:serverStreaming").surround { + IO.pure(TestResponse()) + } + } + + def bothStreaming(request: Stream[IO, TestRequest], ctx: Metadata): Stream[IO, TestResponse] = + Stream.eval { + T.span("internal-handler:bothStreaming").surround { + IO.pure(TestResponse()) + } + } + } + + private def startServices(id: String)(xs: ServerServiceDefinition): Resource[IO, Server] = + InProcessServerBuilder + .forName(id) + .addService(xs) + .resource[IO] + .evalTap(s => IO.delay(s.start())) + + private def bindClientChannel(id: String): Resource[IO, Channel] = + InProcessChannelBuilder.forName(id).usePlaintext().resource[IO] + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/AspectOperation.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/AspectOperation.scala new file mode 100644 index 00000000..8dcc8565 --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/AspectOperation.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +sealed trait AspectOperation + +object AspectOperation { + + case object UnaryToUnaryCall extends AspectOperation + case object UnaryToStreamingCall extends AspectOperation + case object StreamingToUnaryCall extends AspectOperation + case object StreamingToStreamingCall extends AspectOperation + case object UnaryToUnaryCallTrailers extends AspectOperation + case object StreamingToUnaryCallTrailers extends AspectOperation + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/CommonAttributes.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/CommonAttributes.scala new file mode 100644 index 00000000..cfdd74ab --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/CommonAttributes.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import org.typelevel.otel4s.{Attribute, AttributeKey} + +/** @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/]] + */ +private[trace] object CommonAttributes { + + private object Keys { + val rpcSystemName: AttributeKey[String] = AttributeKey.string("rpc.system.name") + val rpcMethod: AttributeKey[String] = AttributeKey.string("rpc.method") + val rpcResponseStatusCode: AttributeKey[String] = AttributeKey.string("rpc.response.status_code") + } + + val rpcSystem: Attribute[String] = + Keys.rpcSystemName("grpc") + + def rpcMethod(method: String): Attribute[String] = + Keys.rpcMethod(method) + + def rpcResponseStatusCode(statusCode: String): Attribute[String] = + Keys.rpcResponseStatusCode(statusCode) + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/GrpcSpanFinalizers.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/GrpcSpanFinalizers.scala new file mode 100644 index 00000000..c68993b7 --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/GrpcSpanFinalizers.scala @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import cats.syntax.semigroup._ +import io.grpc.{Status, StatusException, StatusRuntimeException} +import org.typelevel.otel4s.trace.{SpanFinalizer, StatusCode} +import cats.effect.kernel.Resource +import org.typelevel.otel4s.semconv.attributes.ErrorAttributes + +private[trace] object GrpcSpanFinalizers { + + private val serverErrorCodes = Set( + Status.Code.UNKNOWN, + Status.Code.DEADLINE_EXCEEDED, + Status.Code.UNIMPLEMENTED, + Status.Code.INTERNAL, + Status.Code.UNAVAILABLE, + Status.Code.DATA_LOSS + ) + + val client: SpanFinalizer.Strategy = { + case Resource.ExitCase.Succeeded => + SpanFinalizer.addAttribute(CommonAttributes.rpcResponseStatusCode(Status.Code.OK.name())) + + case Resource.ExitCase.Canceled => + val code = Status.Code.CANCELLED.name() + + SpanFinalizer.addAttributes( + CommonAttributes.rpcResponseStatusCode(code), + ErrorAttributes.ErrorType(code) + ) |+| + SpanFinalizer.setStatus(StatusCode.Error, code) + + case Resource.ExitCase.Errored(error) => + val code = clientStatusCode(error).name() + + SpanFinalizer.addAttributes( + CommonAttributes.rpcResponseStatusCode(code), + ErrorAttributes.ErrorType(code) + ) |+| + SpanFinalizer.recordException(error) |+| + SpanFinalizer.setStatus(StatusCode.Error, code) + } + + val server: SpanFinalizer.Strategy = { + case Resource.ExitCase.Succeeded => + SpanFinalizer.addAttribute(CommonAttributes.rpcResponseStatusCode(Status.Code.OK.name())) + + case Resource.ExitCase.Canceled => + SpanFinalizer.addAttribute(CommonAttributes.rpcResponseStatusCode(Status.Code.CANCELLED.name())) + + case Resource.ExitCase.Errored(error) => + val code = serverStatusCode(error) + val base = SpanFinalizer.addAttribute(CommonAttributes.rpcResponseStatusCode(code.name())) + + if (serverErrorCodes(code)) { + base |+| + SpanFinalizer.addAttribute(ErrorAttributes.ErrorType(code.name())) |+| + SpanFinalizer.recordException(error) |+| + SpanFinalizer.setStatus(StatusCode.Error, code.name()) + } else { + base + } + } + + private def clientStatusCode(error: Throwable): Status.Code = + error match { + case ex: StatusException => ex.getStatus.getCode + case ex: StatusRuntimeException => ex.getStatus.getCode + case _ => Status.Code.UNKNOWN + } + + private def serverStatusCode(error: Throwable): Status.Code = + error match { + case ex: StatusException => ex.getStatus.getCode + case ex: StatusRuntimeException => ex.getStatus.getCode + case _ => Status.Code.INTERNAL + } +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapGetters.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapGetters.scala new file mode 100644 index 00000000..ee69e8cc --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapGetters.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import io.grpc.Metadata +import org.typelevel.otel4s.context.propagation.TextMapGetter + +import scala.jdk.CollectionConverters._ + +object TextMapGetters { + + val asciiStringMetadataTextMapGetter: TextMapGetter[Metadata] = + new TextMapGetter[Metadata] { + def get(carrier: Metadata, key: String): Option[String] = + Option(carrier.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))) + + def keys(carrier: Metadata): Iterable[String] = + carrier.keys().asScala + } + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapUpdaters.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapUpdaters.scala new file mode 100644 index 00000000..ee5ab310 --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TextMapUpdaters.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import io.grpc.Metadata +import org.typelevel.otel4s.context.propagation.TextMapUpdater + +object TextMapUpdaters { + + val asciiStringMetadataTextMapUpdater: TextMapUpdater[Metadata] = + new TextMapUpdater[Metadata] { + def updated(carrier: Metadata, key: String, value: String): Metadata = { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value) + carrier + } + } + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceClientAspect.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceClientAspect.scala new file mode 100644 index 00000000..3a6eaf84 --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceClientAspect.scala @@ -0,0 +1,318 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import cats.effect.MonadCancelThrow +import cats.syntax.all._ +import fs2.Stream +import fs2.grpc.client._ +import io.grpc.Metadata +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.context.propagation.TextMapUpdater +import org.typelevel.otel4s.trace.{SpanFinalizer, SpanKind, Tracer, TracerProvider} +import fs2.grpc.BuildInfo +import org.typelevel.otel4s.semconv.attributes.ServerAttributes + +private class TraceClientAspect[F[_]: MonadCancelThrow: Tracer]( + spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String, + attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes, + finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy +)(implicit textMapUpdater: TextMapUpdater[Metadata]) + extends ClientAspect[F, F, Metadata] { + + override def visitUnaryToUnaryCallTrailers[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Req, + run: (Req, Metadata) => F[(Res, Metadata)] + ): F[(Res, Metadata)] = + propagate(AspectOperation.UnaryToUnaryCallTrailers, callCtx, metadata => run(req, metadata)) + + override def visitStreamingToUnaryCallTrailers[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => F[(Res, Metadata)] + ): F[(Res, Metadata)] = + propagate(AspectOperation.StreamingToUnaryCallTrailers, callCtx, metadata => run(req, metadata)) + + override def visitUnaryToUnaryCall[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Req, + run: (Req, Metadata) => F[Res] + ): F[Res] = + propagate(AspectOperation.UnaryToUnaryCall, callCtx, metadata => run(req, metadata)) + + override def visitUnaryToStreamingCall[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Req, + run: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream + .resource(span(AspectOperation.UnaryToStreamingCall, callCtx).resource) + .flatMap { res => + Stream + .eval(propagate(callCtx)) + .flatMap(metadata => run(req, metadata)) + .translate(res.trace) + } + + override def visitStreamingToUnaryCall[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] = + propagate(AspectOperation.StreamingToUnaryCall, callCtx, metadata => run(req, metadata)) + + override def visitStreamingToStreamingCall[Req, Res]( + callCtx: ClientCallContext[Req, Res, Metadata], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream + .resource(span(AspectOperation.StreamingToStreamingCall, callCtx).resource) + .flatMap { res => + Stream + .eval(propagate(callCtx)) + .flatMap(metadata => run(req, metadata)) + .translate(res.trace) + } + + private def propagate[A]( + operation: AspectOperation, + callCtx: ClientCallContext[?, ?, Metadata], + fa: Metadata => F[A] + ): F[A] = + MonadCancelThrow[F].uncancelable { poll => + span(operation, callCtx).surround { + propagate(callCtx).flatMap(metadata => poll(fa(metadata))) + } + } + + private def propagate(callCtx: ClientCallContext[?, ?, Metadata]): F[Metadata] = + Tracer[F].propagate(new Metadata()).map { metadata => + metadata.merge(callCtx.ctx) + metadata + } + + private def span(operation: AspectOperation, ctx: ClientCallContext[?, ?, Metadata]) = + Tracer[F] + .spanBuilder(spanName(operation, ctx)) + .addAttributes(attributes(operation, ctx)) + .withSpanKind(SpanKind.Client) + .withFinalizationStrategy(finalizationStrategy(operation, ctx)) + .build +} + +object TraceClientAspect { + + /** Configuration for client tracing. + * + * The default configuration follows OpenTelemetry gRPC semantic conventions for the attributes this aspect can + * determine from fs2-grpc call context. + * + * Use [[withServerAddress]] to add `server.address` and optional `server.port`, since the gRPC channel target is not + * available to the aspect. + * + * @example + * {{{ + * val config = TraceClientAspect.Config.default.withServerAddress("grpc.io", Some(50051)) + * }}} + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + trait Config { + + /** Instrumentation scope name used to obtain the tracer. */ + def tracerName: String + + /** Text map updater used to inject context into outgoing gRPC metadata. */ + def textMapUpdater: TextMapUpdater[Metadata] + + /** Function used to name client spans. */ + def spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String + + /** Function used to add attributes when client spans are created. */ + def attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes + + /** Strategy used to record status, errors, and exceptions when client spans end. */ + def finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy + + /** Sets the instrumentation scope name used to obtain the tracer. */ + def withTracerName(tracerName: String): Config + + /** Sets the text map updater used to inject context into outgoing gRPC metadata. */ + def withTextMapUpdater(textMapUpdater: TextMapUpdater[Metadata]): Config + + /** Sets the span name function. + * + * The default is the full gRPC method name, for example `com.example.EchoService/Echo`. + */ + def withSpanName(spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String): Config + + /** Replaces the span attribute function. + * + * You must include `rpc.system.name` and `rpc.method` manually when replacing the defaults. + * + * @example + * {{{ + * val config = + * TraceClientAspect.Config.default.withAttributes { (_, ctx) => + * Attributes( + * CommonAttributes.rpcSystem, + * CommonAttributes.rpcMethod(ctx.methodDescriptor.getFullMethodName) + * ) + * } + * }}} + */ + def withAttributes(attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes): Config + + /** Adds `server.address` and, when provided, `server.port` to client spans. + * + * Use values derived from the gRPC channel target configuration, not connection-level peer information. + * + * @example + * {{{ + * val withPort = TraceClientAspect.Config.default.withServerAddress("grpc.io", Some(50051)) + * val socket = TraceClientAspect.Config.default.withServerAddress("/run/service.sock", None) + * }}} + */ + def withServerAddress(serverAddress: String, serverPort: Option[Int]): Config + + /** Sets the span finalization strategy used to record status, errors, and exceptions. */ + def withFinalizationStrategy( + finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy + ): Config + } + + object Config { + + object Defaults { + val tracerName: String = "fs2.grpc" + + val textMapUpdater: TextMapUpdater[Metadata] = TextMapUpdaters.asciiStringMetadataTextMapUpdater + + val spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String = + (_, ctx) => ctx.methodDescriptor.getFullMethodName + + val attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes = + (_, ctx) => + Attributes( + CommonAttributes.rpcSystem, + CommonAttributes.rpcMethod(ctx.methodDescriptor.getFullMethodName) + ) + + val finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy = + (_, _) => GrpcSpanFinalizers.client + } + + /** Default client tracing configuration. + * + * Emits `rpc.system.name`, `rpc.method`, and `rpc.response.status_code`, names spans with the full gRPC method + * name, injects context into ASCII gRPC metadata, and classifies gRPC client errors. + */ + def default: Config = { + ConfigImpl( + Defaults.tracerName, + Defaults.textMapUpdater, + Defaults.spanName, + Defaults.attributes, + Defaults.finalizationStrategy + ) + } + + private final case class ConfigImpl( + tracerName: String, + textMapUpdater: TextMapUpdater[Metadata], + spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String, + attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes, + finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy + ) extends Config { + def withTracerName(tracerName: String): Config = + copy(tracerName = tracerName) + + def withTextMapUpdater(textMapUpdater: TextMapUpdater[Metadata]): Config = + copy(textMapUpdater = textMapUpdater) + + def withSpanName(spanName: (AspectOperation, ClientCallContext[?, ?, Metadata]) => String): Config = + copy(spanName = spanName) + + def withAttributes(attributes: (AspectOperation, ClientCallContext[?, ?, Metadata]) => Attributes): Config = + copy(attributes = attributes) + + def withServerAddress(serverAddress: String, serverPort: Option[Int]): Config = + copy(attributes = + (operation, ctx) => + attributes(operation, ctx) ++ serverPort.fold( + Attributes(ServerAttributes.ServerAddress(serverAddress)) + ) { port => + Attributes( + ServerAttributes.ServerAddress(serverAddress), + ServerAttributes.ServerPort(port.toLong) + ) + } + ) + + def withFinalizationStrategy( + finalizationStrategy: (AspectOperation, ClientCallContext[?, ?, Metadata]) => SpanFinalizer.Strategy + ): Config = + copy(finalizationStrategy = finalizationStrategy) + } + + } + + /** Creates a client tracing aspect using [[Config.default]]. + * + * Defaults follow the OpenTelemetry gRPC semantic conventions for span kind, span name, `rpc.system.name`, + * `rpc.method`, and `rpc.response.status_code`. + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + def create[F[_]: MonadCancelThrow: TracerProvider]: F[ClientAspect[F, F, Metadata]] = + create(Config.default) + + /** Creates a client tracing aspect using the supplied configuration. + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + def create[F[_]: MonadCancelThrow: TracerProvider](config: Config): F[ClientAspect[F, F, Metadata]] = + TracerProvider[F].tracer(config.tracerName).withVersion(BuildInfo.version).get.map { implicit tracer => + implicit val textMapUpdater: TextMapUpdater[Metadata] = config.textMapUpdater + + new TraceClientAspect[F]( + config.spanName, + config.attributes, + config.finalizationStrategy + ) + } + +} diff --git a/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceServiceAspect.scala b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceServiceAspect.scala new file mode 100644 index 00000000..384d9ab2 --- /dev/null +++ b/otel4s-trace/src/main/scala/fs2/grpc/otel4s/trace/TraceServiceAspect.scala @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2018 Gary Coady / Fs2 Grpc Developers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.grpc.otel4s.trace + +import cats.effect.MonadCancelThrow +import cats.syntax.all._ +import fs2.Stream +import fs2.grpc.server._ +import io.grpc.Metadata +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.context.propagation.TextMapGetter +import org.typelevel.otel4s.trace._ +import fs2.grpc.BuildInfo +import org.typelevel.otel4s.semconv.attributes.ServerAttributes + +import scala.util.chaining._ + +private class TraceServiceAspect[F[_]: MonadCancelThrow: Tracer]( + spanName: (AspectOperation, ServiceCallContext[?, ?]) => String, + attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes, + finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy +)(implicit textMapGetter: TextMapGetter[Metadata]) + extends ServiceAspect[F, F, Metadata] { + + def visitUnaryToUnaryCall[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Req, + run: (Req, Metadata) => F[Res] + ): F[Res] = + joinOrRoot(AspectOperation.UnaryToUnaryCall, callCtx, run(req, callCtx.metadata)) + + def visitUnaryToStreamingCall[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Req, + run: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream.eval(Tracer[F].joinOrRoot(callCtx.metadata)(Tracer[F].currentSpanContext)).flatMap { ctx => + Stream.resource(span(AspectOperation.UnaryToStreamingCall, callCtx, ctx).resource).flatMap { res => + run(req, callCtx.metadata).translate(res.trace) + } + } + + def visitStreamingToUnaryCall[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] = + joinOrRoot(AspectOperation.StreamingToUnaryCall, callCtx, run(req, callCtx.metadata)) + + def visitStreamingToStreamingCall[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream.eval(Tracer[F].joinOrRoot(callCtx.metadata)(Tracer[F].currentSpanContext)).flatMap { ctx => + Stream.resource(span(AspectOperation.StreamingToStreamingCall, callCtx, ctx).resource).flatMap { res => + run(req, callCtx.metadata).translate(res.trace) + } + } + + def visitUnaryToUnaryCallTrailers[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Req, + run: (Req, Metadata) => F[(Res, Metadata)] + ): F[(Res, Metadata)] = + joinOrRoot(AspectOperation.UnaryToUnaryCallTrailers, callCtx, run(req, callCtx.metadata)) + + def visitStreamingToUnaryCallTrailers[Req, Res]( + callCtx: ServiceCallContext[Req, Res], + req: Stream[F, Req], + run: (Stream[F, Req], Metadata) => F[(Res, Metadata)] + ): F[(Res, Metadata)] = + joinOrRoot(AspectOperation.StreamingToUnaryCallTrailers, callCtx, run(req, callCtx.metadata)) + + private def joinOrRoot[A](operation: AspectOperation, callCtx: ServiceCallContext[?, ?], fa: => F[A]): F[A] = + MonadCancelThrow[F].uncancelable { poll => + Tracer[F].joinOrRoot(callCtx.metadata) { + span(operation, callCtx, None).surround(poll(fa)) + } + } + + private def span(operation: AspectOperation, ctx: ServiceCallContext[?, ?], parent: Option[SpanContext]) = + Tracer[F] + .spanBuilder(spanName(operation, ctx)) + .addAttributes(attributes(operation, ctx)) + .withSpanKind(SpanKind.Server) + .withFinalizationStrategy(finalizationStrategy(operation, ctx)) + .pipe(b => parent.fold(b)(b.withParent(_))) + .build + +} + +object TraceServiceAspect { + + /** Configuration for service tracing. + * + * The default configuration follows OpenTelemetry gRPC semantic conventions for the attributes this aspect can + * determine from fs2-grpc call context. + * + * Use [[withServerAddress]] to add `server.address` and optional `server.port`, since the server bind address is not + * available to the aspect. + * + * @example + * {{{ + * val config = + * TraceServiceAspect.Config.default + * .withSpanName((_, ctx) => ctx.methodDescriptor.getFullMethodName) + * }}} + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + trait Config { + + /** Instrumentation scope name used to obtain the tracer. */ + def tracerName: String + + /** Text map getter used to extract context from incoming gRPC metadata. */ + def textMapGetter: TextMapGetter[Metadata] + + /** Function used to name service spans. */ + def spanName: (AspectOperation, ServiceCallContext[?, ?]) => String + + /** Function used to add attributes when service spans are created. */ + def attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes + + /** Strategy used to record status, errors, and exceptions when service spans end. */ + def finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy + + /** Sets the instrumentation scope name used to obtain the tracer. */ + def withTracerName(tracerName: String): Config + + /** Sets the text map getter used to extract context from incoming gRPC metadata. */ + def withTextMapGetter(textMapGetter: TextMapGetter[Metadata]): Config + + /** Sets the span name function. + * + * The default is the full gRPC method name, for example `com.example.EchoService/Echo`. + */ + def withSpanName(spanName: (AspectOperation, ServiceCallContext[?, ?]) => String): Config + + /** Replaces the span attribute function. + * + * You must include `rpc.system.name` and `rpc.method` manually when replacing the defaults. + * + * @example + * {{{ + * val config = + * TraceServiceAspect.Config.default.withAttributes { (_, ctx) => + * Attributes( + * CommonAttributes.rpcSystem, + * CommonAttributes.rpcMethod(ctx.methodDescriptor.getFullMethodName) + * ) + * } + * }}} + */ + def withAttributes(attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes): Config + + /** Adds `server.address` and, when provided, `server.port` to service spans. + * + * Use values derived from the logical server address, not connection-level peer information. + * + * @example + * {{{ + * val withPort = TraceServiceAspect.Config.default.withServerAddress("grpc.io", Some(50051)) + * val socket = TraceServiceAspect.Config.default.withServerAddress("/run/service.sock", None) + * }}} + */ + def withServerAddress(serverAddress: String, serverPort: Option[Int]): Config + + /** Sets the span finalization strategy used to record status, errors, and exceptions. */ + def withFinalizationStrategy( + finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy + ): Config + } + + object Config { + + object Defaults { + val tracerName: String = "fs2.grpc" + + val textMapGetter: TextMapGetter[Metadata] = TextMapGetters.asciiStringMetadataTextMapGetter + + val spanName: (AspectOperation, ServiceCallContext[?, ?]) => String = + (_, ctx) => ctx.methodDescriptor.getFullMethodName + + val attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes = + (_, ctx) => + Attributes( + CommonAttributes.rpcSystem, + CommonAttributes.rpcMethod(ctx.methodDescriptor.getFullMethodName) + ) + + val finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy = + (_, _) => GrpcSpanFinalizers.server + } + + /** Default service tracing configuration. + * + * Emits `rpc.system.name`, `rpc.method`, and `rpc.response.status_code`, names spans with the full gRPC method + * name, extracts context from ASCII gRPC metadata, and classifies gRPC server errors. + */ + def default: Config = { + ConfigImpl( + Defaults.tracerName, + Defaults.textMapGetter, + Defaults.spanName, + Defaults.attributes, + Defaults.finalizationStrategy + ) + } + + private final case class ConfigImpl( + tracerName: String, + textMapGetter: TextMapGetter[Metadata], + spanName: (AspectOperation, ServiceCallContext[?, ?]) => String, + attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes, + finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy + ) extends Config { + def withTracerName(tracerName: String): Config = + copy(tracerName = tracerName) + + def withTextMapGetter(textMapGetter: TextMapGetter[Metadata]): Config = + copy(textMapGetter = textMapGetter) + + def withSpanName( + spanName: (AspectOperation, ServiceCallContext[?, ?]) => String + ): Config = + copy(spanName = spanName) + + def withAttributes( + attributes: (AspectOperation, ServiceCallContext[?, ?]) => Attributes + ): Config = + copy(attributes = attributes) + + def withServerAddress(serverAddress: String, serverPort: Option[Int]): Config = + copy(attributes = + (operation, ctx) => + attributes(operation, ctx) ++ serverPort.fold( + Attributes(ServerAttributes.ServerAddress(serverAddress)) + ) { port => + Attributes( + ServerAttributes.ServerAddress(serverAddress), + ServerAttributes.ServerPort(port.toLong) + ) + } + ) + + def withFinalizationStrategy( + finalizationStrategy: (AspectOperation, ServiceCallContext[?, ?]) => SpanFinalizer.Strategy + ): Config = + copy(finalizationStrategy = finalizationStrategy) + } + + } + + /** Creates a service tracing aspect using [[Config.default]]. + * + * Defaults follow the OpenTelemetry gRPC semantic conventions for span kind, span name, `rpc.system.name`, + * `rpc.method`, and `rpc.response.status_code`. + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + def create[F[_]: MonadCancelThrow: TracerProvider]: F[ServiceAspect[F, F, Metadata]] = + create(Config.default) + + /** Creates a service tracing aspect using the supplied configuration. + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ OpenTelemetry Semantic conventions for gRPC]] + * + * @see + * [[https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans OpenTelemetry Semantic conventions for RPC spans]] + */ + def create[F[_]: MonadCancelThrow: TracerProvider](config: Config): F[ServiceAspect[F, F, Metadata]] = + TracerProvider[F].tracer(config.tracerName).withVersion(BuildInfo.version).get.map { implicit tracer => + implicit val textMapGetter: TextMapGetter[Metadata] = config.textMapGetter + + new TraceServiceAspect[F]( + config.spanName, + config.attributes, + config.finalizationStrategy + ) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7f03f71a..97276c51 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,6 +10,7 @@ object Dependencies { val fs2 = "3.13.0" val catsEffect = "3.7.0" val ceMunit = "2.2.0" + val otel4s = "1.0.0-RC1" val sbtProtoc = "1.0.8" @@ -19,6 +20,8 @@ object Dependencies { val fs2 = "co.fs2" %% "fs2-core" % versions.fs2 val catsEffect = "org.typelevel" %% "cats-effect" % versions.catsEffect + val otel4sCoreTrace = "org.typelevel" %% "otel4s-core-trace" % versions.otel4s + val otel4sSemconv = "org.typelevel" %% "otel4s-semconv" % versions.otel4s val grpcApi = "io.grpc" % "grpc-api" % versions.grpc // Testing @@ -26,6 +29,8 @@ object Dependencies { val ceTestkit = "org.typelevel" %% "cats-effect-testkit" % versions.catsEffect val ceMunit = "org.typelevel" %% "munit-cats-effect" % versions.ceMunit val grpcNetty = "io.grpc" % "grpc-netty-shaded" % versions.grpc + val otel4sOtelJavaTestkit = "org.typelevel" %% "otel4s-oteljava-testkit" % versions.otel4s + val otel4sSemconvExperimental = "org.typelevel" %% "otel4s-semconv-experimental" % versions.otel4s // Compiler & SBT Plugins