DEV Community

Linh Nguyen
Linh Nguyen

Posted on • Originally published at tuleism.github.io on

OpenTelemetry Distributed Tracing with ZIO

Introduction

This post is some quick notes on using ZIO and zio-telemetry to implement OpenTelemetry distributed tracing for Scala applications.
The source code is available here.

This is not an introduction to any of these technologies, but here are a few good reads:

Initial implementation

For demonstration purpose, we will perform manual instrumentation on a modified version of the zio-grpc's helloworld example, in which we incorporate both gRPC and HTTP communications:

  • Original: hello-client sends a HelloRequest with name x and hello-server returns a HelloResponse with message Hello, x.
  • Modified: in addition to the original behavior, client sends an optional integer field guess and server performs an HTTP request to HTTPBin based on its value.

Initial Diagram

Add the new flag guess:

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
  google.protobuf.Int32Value guess = 2;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
Enter fullscreen mode Exit fullscreen mode

Add zio-grpc dependency

resolvers += Resolver.sonatypeRepo("snapshots")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")

addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")

val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"

libraryDependencies ++= Seq(
  "com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
  "com.thesamet.scalapb"          %% "compilerplugin"   % "0.11.5"
)
Enter fullscreen mode Exit fullscreen mode

Set up build.sbt

  • Generate Scala code from helloworld.proto.
  • Depend on sttp for HTTP client.
val grpcVersion = "1.41.0"
val sttpVersion = "3.3.15"

val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion

val grpcRuntimeDeps = Seq(
  "io.grpc"      % "grpc-netty" % grpcVersion,
  scalaPBRuntime,
  scalaPBRuntime % "protobuf"
)

val sttpZioDeps = Seq(
  "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion
)

lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)

lazy val zio = commonProject("zio").settings(
  Compile / PB.targets := Seq(
    scalapb.gen(grpc = true)          -> (Compile / sourceManaged).value,
    scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value
  ),
  libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
)
Enter fullscreen mode Exit fullscreen mode

Client implementation

  • Create a gRPC client pointing to localhost:9000.
  • Send a single HelloRequest.
  • Send 5 HelloRequests in parallel.
  • Send a single HelloRequest with an invalid guess.
  • Print "Done" and exit.
object ZClient extends zio.App {
  private val clientLayer = GreeterClient.live(
    ZManagedChannel(
      ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
    )
  )

  private val singleHello = GreeterClient.sayHello(HelloRequest("World"))

  private val multipleHellos = ZIO.collectAllParN(5)(
    List(
      GreeterClient.sayHello(HelloRequest("1", Some(1))),
      GreeterClient.sayHello(HelloRequest("2", Some(2))),
      GreeterClient.sayHello(HelloRequest("3", Some(3))),
      GreeterClient.sayHello(HelloRequest("4", Some(4))),
      GreeterClient.sayHello(HelloRequest("5", Some(5)))
    )
  )

  private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore

  private def myAppLogic =
    singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    myAppLogic.provideCustomLayer(clientLayer).exitCode
}
Enter fullscreen mode Exit fullscreen mode

Server implementation

  • Fail the request if guess is less than 0.
  • Based on the value of guess, delay for some time and then send a request to HTTPBin.
type ZGreeterEnv = Clock with Random with SttpClient
Enter fullscreen mode Exit fullscreen mode
object ZGreeterImpl extends RGreeter[ZGreeterEnv] {

  def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
    val guess = request.guess.getOrElse(0)
    for {
      _      <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
      code   <- ???
      delayMs = ???
      _      <- httpRequest(code)
                  .delay(delayMs.millis)
                  .mapError(ex => Status.INTERNAL.withCause(ex))
    } yield HelloReply(s"Hello, ${request.name}")
  }

  def httpRequest(code: Int): RIO[SttpClient, Unit] =
    send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
}
Enter fullscreen mode Exit fullscreen mode

Run it

  • To run the server:
$ sbt "zio/runMain com.github.tuleism.ZServer"

[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
Enter fullscreen mode Exit fullscreen mode
  • To run the client:
$ sbt "zio/runMain com.github.tuleism.ZClient"

[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
Enter fullscreen mode Exit fullscreen mode

At this point, we only know that it takes roughly 12 seconds for the client to initialize and finish its work.

Let's add distributed tracing to gain more insights into this.

Common tracing requirements

  • For both client and server, we need to acquire a Tracer, an object responsible for creating and managing Spans.
  • Tracing data is sent to Jaeger, which acts as a standalone collector.

Instrumented Diagram

Add new dependencies

val openTelemetryVersion = "1.6.0"
val zioConfigVersion     = "1.0.10"
val zioMagicVersion      = "0.3.9"
val zioTelemetryVersion  = "0.8.2"

val openTelemetryDeps = Seq(
  "io.opentelemetry" % "opentelemetry-exporter-jaeger"    % openTelemetryVersion,
  "io.opentelemetry" % "opentelemetry-sdk"                % openTelemetryVersion,
  "io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
)

val zioConfigDeps = Seq(
  "dev.zio" %% "zio-config"          % zioConfigVersion,
  "dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
  "dev.zio" %% "zio-config-typesafe" % zioConfigVersion
)

val zioMagicDeps = Seq(
  "io.github.kitlangton" %% "zio-magic" % zioMagicVersion
)

val zioTelemetryDeps = Seq(
  "dev.zio"                       %% "zio-opentelemetry"                   % zioTelemetryVersion,
  "com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
)
Enter fullscreen mode Exit fullscreen mode

Add a config layer

tracing {
  enable = false
  enable = ${?TRACING_ENABLE}
  endpoint = "http://127.0.0.1:14250"
  endpoint = ${?JAEGER_ENDPOINT}
}
Enter fullscreen mode Exit fullscreen mode
case class AppConfig(tracing: TracingConfig)

case class TracingConfig(enable: Boolean, endpoint: String)

object AppConfig {
  private val configDescriptor = descriptor[AppConfig]

  val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
}
Enter fullscreen mode Exit fullscreen mode

Add a Tracer layer

  • Depend on the configuration, we either create a noop Tracer or one that sends data to Jaeger.
  • Once we have it, we can construct a Tracing layer, which give us access to many useful operations in zio-telemetry.
object ZTracer {
  private val InstrumentationName = "com.github.tuleism"

  private def managed(serviceName: String, endpoint: String) = {
    val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
    for {
      spanExporter   <- ZManaged.fromAutoCloseable(
                          Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
                        )
      spanProcessor  <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
      tracerProvider <- UIO(
                          SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
                        ).toManaged_
      openTelemetry  <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
      tracer         <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
    } yield tracer
  }

  def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
    (
      for {
        config <- ZIO.service[TracingConfig].toManaged_
        tracer <- if (!config.enable) {
                    Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_
                  } else {
                    managed(serviceName, config.endpoint)
                  }
      } yield tracer
    ).toLayer
}
Enter fullscreen mode Exit fullscreen mode

New server

Instrument the HTTP client

object SttpTracing {
  private val wrapper = new ZioTelemetryOpenTelemetryTracer {
    def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
      Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
        Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
        ZIO.unit

    def after[T](response: Response[T]): RIO[Tracing, Unit] =
      Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
        ZIO.unit
  }

  val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
    ZIO
      .service[Tracing.Service]
      .map { tracing =>
        ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
      }
      .toLayer
  }
}
Enter fullscreen mode Exit fullscreen mode

Instrument the gRPC server

We can add Tracing without changing our server implementation with a ZTransform. For each request:

object GrpcTracing {
  private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()

  private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {
    override def keys(carrier: Metadata): java.lang.Iterable[String] =
      carrier.keys()

    override def get(carrier: Metadata, key: String): String =
      carrier.get(
        Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
      )
  }

  private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
    Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>
      effect
        .tapBoth(
          status =>
            Tracing.setAttribute(
              SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,
              status.getCode.value()
            ),
          _ =>
            Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())
        )

  def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
    new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {

      def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
        for {
          rc       <- ZIO.service[RequestContext]
          metadata <- rc.metadata.wrap(identity)
          result   <- withSemanticAttributes(io)
                        .spanFrom(
                          propagator,
                          metadata,
                          metadataGetter,
                          rc.methodDescriptor.getFullMethodName,
                          SpanKind.SERVER,
                          { case _ => StatusCode.ERROR }
                        )
        } yield result

      def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
        ???
    }
}
Enter fullscreen mode Exit fullscreen mode

Update Server Main

  • Add required layers for Tracing.
  • Transform the original ZGreeterImpl.
import zio.magic._

object ZServer extends ServerMain {
  private val requirements =
    ZLayer
      .wire[ZEnv with ZGreeterEnv](
        ZEnv.live,
        AppConfig.live.narrow(_.tracing),
        ZTracer.live("hello-server"),
        Tracing.live,
        SttpTracing.live
      )
      .orDie

  def services: ServiceList[Any] =
    ServiceList
      .add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))
      .provideLayer(requirements)
}
Enter fullscreen mode Exit fullscreen mode

New client

Inject current context into gRPC Metadata for context propagation

object GrpcTracing {

  ...

  private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>
    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)

  val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {
    (_, _, metadata) =>
      metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))
  }

  ...

}
Enter fullscreen mode Exit fullscreen mode
object ZClient extends zio.App {
  private val clientLayer = GreeterClient.live(
    ZManagedChannel(
      ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
      Seq(GrpcTracing.contextPropagationClientInterceptor)
    )
  )

  ...
}
Enter fullscreen mode Exit fullscreen mode

Start a Span for each request

  • Use ZTransform to record the relevant gRPC attributes.
object GrpcTracing {

  ...

  def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
    new ZTransform[R, Status, R with Tracing] {
      def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)

      def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
    }
}
Enter fullscreen mode Exit fullscreen mode
  • Unlike the server, we don't have access to a RequestContext object, so we have to set the method name manually.
  • We also start additional Spans.
object ZClient extends zio.App {

  ...


  private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }

  private def sayHello(request: HelloRequest) =
    GreeterClient
      .sayHello(request)
      .span(
        GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,
        SpanKind.CLIENT,
        errorToStatusCode
      )

  private val singleHello = sayHello(HelloRequest("World"))
    .span("singleHello", toErrorStatus = errorToStatusCode)

  private val multipleHellos = ZIO
    .collectAllParN(5)(
      List(
        sayHello(HelloRequest("1", Some(1))),
        sayHello(HelloRequest("2", Some(2))),
        sayHello(HelloRequest("3", Some(3))),
        sayHello(HelloRequest("4", Some(4))),
        sayHello(HelloRequest("5", Some(5)))
      )
    )
    .span("multipleHellos", toErrorStatus = errorToStatusCode)

  private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
    .span("invalidHello", toErrorStatus = errorToStatusCode)
}
Enter fullscreen mode Exit fullscreen mode

Add required layers

object ZClient extends zio.App {

  ...

  private val requirements = ZLayer
    .wire[ZEnv with Tracing](
      ZEnv.live,
      AppConfig.live.narrow(_.tracing),
      ZTracer.live("hello-client"),
      Tracing.live
    ) >+> clientLayer

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    myAppLogic.provideCustomLayer(requirements).exitCode
}
Enter fullscreen mode Exit fullscreen mode

Showtime

Run Jaeger through Docker

$ docker run --rm --name jaeger \
  -p 16686:16686 \
  -p 14250:14250 \
  jaegertracing/all-in-one:1.25
Enter fullscreen mode Exit fullscreen mode

Start the server

$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"

[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
Enter fullscreen mode Exit fullscreen mode

Start the client

$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"

[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
Enter fullscreen mode Exit fullscreen mode

Distributed Tracing in action!

  • Now we can see the details for multipleHellos:

multipleHellos Span

  • And which guess is causing the longest delay.

bad guess

Integration with Logging

Add logging dependency

val izumiVersion         = "1.0.8"

val loggingDeps = Seq(
  "io.7mind.izumi" %% "logstage-core"          % izumiVersion,
  "io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
)
Enter fullscreen mode Exit fullscreen mode

Setup logging

  • Add trace_id, span_id to logging context if current trace context is valid.
object Logging {
  private def baseLogger = IzLogger()

  val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
    (
      for {
        tracing <- ZIO.service[Tracing.Service]
      } yield LogZIO.withDynamicContext(baseLogger)(
        Tracing.getCurrentSpanContext
          .map(spanContext =>
            if (spanContext.isValid)
              CustomContext(
                "trace_id"    -> spanContext.getTraceId,
                "span_id"     -> spanContext.getSpanId,
                "trace_flags" -> spanContext.getTraceFlags.asHex()
              )
            else
              CustomContext.empty
          )
          .provide(Has(tracing))
      )
    ).toLayer
}
Enter fullscreen mode Exit fullscreen mode

Add a few log messages

  • E.g for singleHello.
object ZClient extends zio.App {

  ...

  private val singleHello = (
    for {
      _ <- log.info("singleHello")
      _ <- sayHello(HelloRequest("World"))
    } yield ()
  ).span("singleHello", toErrorStatus = errorToStatusCode)

}
Enter fullscreen mode Exit fullscreen mode

Sample Logs

[info] running (fork) com.github.tuleism.ZClient
[info] I 2021-11-01T22:59:10.881 (ZClient.scala:37)  …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
[info] I 2021-11-01T22:59:14.064 (ZClient.scala:44)  …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
[info] I 2021-11-01T22:59:18.171 (ZClient.scala:60)  …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
[info] I 2021-11-01T22:59:18.272 (ZClient.scala:66)  ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
[success] Total time: 12 s
Enter fullscreen mode Exit fullscreen mode

Extra notes

  • If we receive an HTTP 5xx response, we should set the Span status to error according to the semantic convention. However, it is currently not possible with zio-telemetry.
  • We need a better way to implement tracing for zio-grpc client.

Top comments (0)