Skip to content

Commit

Permalink
Merge pull request #2579 from softwaremill/fix-vertx-metrics
Browse files Browse the repository at this point in the history
Fix metric collection in vertx and netty
  • Loading branch information
adamw authored Nov 18, 2022
2 parents a26b26b + dabf537 commit f3ce2b1
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import sttp.monad.MonadError
import sttp.tapir.integ.cats.CatsMonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.Route
import sttp.tapir.server.netty.internal.NettyServerInterpreter
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}

trait NettyCatsServerInterpreter[F[_]] {
implicit def async: Async[F]
def nettyServerOptions: NettyCatsServerOptions[F, _]

def toRoute(ses: List[ServerEndpoint[Any, F]]): Route[F] = {
implicit val monad: MonadError[F] = new CatsMonadError[F]
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
val runAsync = new RunAsync[F] {
override def apply[T](f: => F[T]): Unit = nettyServerOptions.dispatcher.unsafeRunAndForget(f)
}
NettyServerInterpreter.toRoute(
ses,
nettyServerOptions.interceptors,
nettyServerOptions.createFile,
nettyServerOptions.deleteFile,
runAsync
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package sttp.tapir.server.netty

import sttp.monad.FutureMonad
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.internal.NettyServerInterpreter
import sttp.tapir.server.netty.NettyFutureServerInterpreter.FutureRunAsync
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -13,7 +14,13 @@ trait NettyFutureServerInterpreter {
ses: List[ServerEndpoint[Any, Future]]
)(implicit ec: ExecutionContext): FutureRoute = {
implicit val monad: FutureMonad = new FutureMonad()
NettyServerInterpreter.toRoute(ses, nettyServerOptions.interceptors, nettyServerOptions.createFile, nettyServerOptions.deleteFile)
NettyServerInterpreter.toRoute(
ses,
nettyServerOptions.interceptors,
nettyServerOptions.createFile,
nettyServerOptions.deleteFile,
FutureRunAsync
)
}
}

Expand All @@ -23,4 +30,8 @@ object NettyFutureServerInterpreter {
override def nettyServerOptions: NettyFutureServerOptions[_] = serverOptions
}
}

private object FutureRunAsync extends RunAsync[Future] {
override def apply[T](f: => Future[T]): Unit = f
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import sttp.tapir.server.netty.NettyResponse

import scala.util.{Failure, Success, Try}

class NettyBodyListener[F[_]](implicit m: MonadError[F]) extends BodyListener[F, NettyResponse] {
class NettyBodyListener[F[_]](runAsync: RunAsync[F])(implicit m: MonadError[F]) extends BodyListener[F, NettyResponse] {
override def onComplete(body: NettyResponse)(cb: Try[Unit] => F[Unit]): F[NettyResponse] = {
m.eval((ctx: ChannelHandlerContext) => {
val nettyResponseContent = body(ctx)
nettyResponseContent.channelPromise.addListener((future: Future[_ >: Void]) => {
if (future.isSuccess) {
cb(Success(()))
runAsync(cb(Success(())))
} else {
cb(Failure(future.cause()))
runAsync(cb(Failure(future.cause())))
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ object NettyServerInterpreter {
ses: List[ServerEndpoint[Any, F]],
interceptors: List[Interceptor[F]],
createFile: ServerRequest => F[TapirFile],
deleteFile: TapirFile => F[Unit]
deleteFile: TapirFile => F[Unit],
runAsync: RunAsync[F]
): Route[F] = {
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener(runAsync)
val serverInterpreter = new ServerInterpreter[Any, F, NettyResponse, NoStreams](
FilterServerEndpoints(ses),
new NettyRequestBody(createFile),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package sttp.tapir.server.netty.internal

trait RunAsync[F[_]] {
def apply[T](f: => F[T]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ object ServerMetricsTest {
}

def newRequestCounter[F[_]]: Metric[F, Counter] =
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onEndpointRequest { _ => m.unit(c.++()) }) })
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onEndpointRequest { _ => m.eval(c.++()) }) })

def newResponseCounter[F[_]]: Metric[F, Counter] =
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onResponseBody { (_, _) => m.unit(c.++()) }) })
Metric[F, Counter](Counter(), onRequest = { (_, c, m) => m.unit(EndpointMetric().onResponseBody { (_, _) => m.eval(c.++()) }) })
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, monadError}
import sttp.tapir.server.vertx.cats.VertxCatsServerInterpreter.{CatsFFromVFuture, CatsRunAsync, monadError}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.ReadStreamCompatible
import sttp.tapir.server.vertx.cats.streams.fs2.fs2ReadStreamCompatible
Expand Down Expand Up @@ -47,7 +47,8 @@ trait VertxCatsServerInterpreter[F[_]] extends CommonServerInterpreter {
readStreamCompatible: ReadStreamCompatible[S]
): Handler[RoutingContext] = {
implicit val monad: MonadError[F] = monadError[F]
implicit val bodyListener: BodyListener[F, RoutingContext => Future[Void]] = new VertxBodyListener[F]
implicit val bodyListener: BodyListener[F, RoutingContext => Future[Void]] =
new VertxBodyListener[F](new CatsRunAsync(vertxCatsServerOptions.dispatcher))
val fFromVFuture = new CatsFFromVFuture[F]
val interpreter: ServerInterpreter[Fs2Streams[F], F, RoutingContext => Future[Void], S] = new ServerInterpreter(
_ => List(e),
Expand Down Expand Up @@ -126,6 +127,10 @@ object VertxCatsServerInterpreter {
def apply[T](f: => Future[T]): F[T] = f.asF
}

private[cats] class CatsRunAsync[F[_]: Async](dispatcher: Dispatcher[F]) extends RunAsync[F] {
override def apply[T](f: => F[T]): Unit = dispatcher.unsafeRunAndForget(f)
}

implicit class VertxFutureToCatsF[A](f: => Future[A]) {
def asF[F[_]: Async]: F[A] = {
Async[F].async_ { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import io.vertx.core.Future
import io.vertx.ext.web.RoutingContext
import sttp.monad.MonadError
import sttp.tapir.server.interpreter.BodyListener
import sttp.tapir.server.vertx.interpreters.RunAsync

import scala.util.{Failure, Success, Try}

class VertxBodyListener[F[_]](implicit m: MonadError[F]) extends BodyListener[F, RoutingContext => Future[Void]] {
class VertxBodyListener[F[_]](runAsync: RunAsync[F])(implicit m: MonadError[F]) extends BodyListener[F, RoutingContext => Future[Void]] {
override def onComplete(body: RoutingContext => Future[Void])(cb: Try[Unit] => F[Unit]): F[RoutingContext => Future[Void]] = {
m.unit {
{ (ctx: RoutingContext) =>
body {
ctx.addBodyEndHandler(_ => cb(Success(())))
ctx.addEndHandler(res => if (res.failed()) cb(Failure(res.cause())))
ctx.addBodyEndHandler(_ => runAsync(cb(Success(()))))
ctx.addEndHandler(res => if (res.failed()) runAsync(cb(Failure(res.cause()))))
ctx
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import sttp.monad.FutureMonad
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.FutureFromVFuture
import sttp.tapir.server.vertx.VertxFutureServerInterpreter.{FutureFromVFuture, FutureRunAsync}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.{VertxStreams, ReadStreamCompatible}
import sttp.tapir.server.vertx.streams.{ReadStreamCompatible, VertxStreams}

import scala.concurrent.{ExecutionContext, Future, Promise}

Expand Down Expand Up @@ -48,7 +48,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
): Handler[RoutingContext] = { rc =>
implicit val ec: ExecutionContext = vertxFutureServerOptions.executionContextOrCurrentCtx(rc)
implicit val monad: FutureMonad = new FutureMonad()
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future]
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future](FutureRunAsync)
val reactiveStreamsReadStream: ReadStreamCompatible[VertxStreams] = streams.reactiveStreamsReadStreamCompatible()
val interpreter = new ServerInterpreter[VertxStreams, Future, RoutingContext => VFuture[Void], VertxStreams](
_ => List(e),
Expand Down Expand Up @@ -86,6 +86,10 @@ object VertxFutureServerInterpreter {
def apply[T](f: => VFuture[T]): Future[T] = f.asScala
}

private[vertx] object FutureRunAsync extends RunAsync[Future] {
override def apply[T](f: => Future[T]): Unit = f
}

implicit class VertxFutureToScalaFuture[A](future: => VFuture[A]) {
def asScala: Future[A] = {
val promise = Promise[A]()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package sttp.tapir.server.vertx.interpreters

trait RunAsync[F[_]] {
def apply[T](f: => F[T]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.VertxBodyListener
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.RioFromVFuture
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.zio.streams._
import sttp.tapir.ztapir.{RIOMonadError, ZServerEndpoint}
import zio._
Expand All @@ -32,7 +32,8 @@ trait VertxZioServerInterpreter[R] extends CommonServerInterpreter {
)(implicit runtime: Runtime[R]): Handler[RoutingContext] = {
val fromVFuture = new RioFromVFuture[R]
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] = new VertxBodyListener[RIO[R, *]]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] =
new VertxBodyListener[RIO[R, *]](new ZioRunAsync(runtime))
val zioReadStream = zioReadStreamCompatible(vertxZioServerOptions)
val interpreter = new ServerInterpreter[ZioStreams, RIO[R, *], RoutingContext => Future[Void], ZioStreams](
_ => List(e),
Expand Down Expand Up @@ -119,6 +120,12 @@ object VertxZioServerInterpreter {
def apply[T](f: => Future[T]): RIO[R, T] = f.asRIO
}

private[vertx] class ZioRunAsync[R](runtime: Runtime[R]) extends RunAsync[RIO[R, *]] {
override def apply[T](f: => RIO[R, T]): Unit = Unsafe.unsafe(implicit u => {
runtime.unsafe.runToFuture(f)
})
}

implicit class VertxFutureToRIO[A](f: => Future[A]) {
def asRIO[R]: RIO[R, A] = {
ZIO.async { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.RioFromVFuture
import sttp.tapir.server.vertx.zio.VertxZioServerInterpreter.{RioFromVFuture, ZioRunAsync}
import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture, RunAsync}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.zio.streams._
import sttp.tapir.server.vertx.VertxBodyListener
Expand All @@ -33,7 +33,8 @@ trait VertxZioServerInterpreter[R <: Blocking] extends CommonServerInterpreter {
)(implicit runtime: Runtime[R]): Handler[RoutingContext] = {
val fromVFuture = new RioFromVFuture[R]
implicit val monadError: RIOMonadError[R] = new RIOMonadError[R]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] = new VertxBodyListener[RIO[R, *]]
implicit val bodyListener: BodyListener[RIO[R, *], RoutingContext => Future[Void]] =
new VertxBodyListener[RIO[R, *]](new ZioRunAsync(runtime))
val zioReadStream = zioReadStreamCompatible(vertxZioServerOptions)
val interpreter = new ServerInterpreter[ZioStreams, RIO[R, *], RoutingContext => Future[Void], ZioStreams](
_ => List(e),
Expand Down Expand Up @@ -119,6 +120,10 @@ object VertxZioServerInterpreter {
def apply[T](f: => Future[T]): RIO[R, T] = f.asRIO
}

private[vertx] class ZioRunAsync[R](runtime: Runtime[R]) extends RunAsync[RIO[R, *]] {
override def apply[T](f: => RIO[R, T]): Unit = runtime.unsafeRunAsync(f)(_ => ())
}

implicit class VertxFutureToRIO[A](f: => Future[A]) {
def asRIO[R]: RIO[R, A] = {
RIO.effectAsync { cb =>
Expand Down

0 comments on commit f3ce2b1

Please sign in to comment.