diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index af6e99b..8378b21 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -7,43 +7,43 @@ my-app { -akka { - log-dead-letters = off - jvm-exit-on-fatal-error = off - loggers = [akka.event.slf4j.Slf4jLogger] - loglevel = DEBUG - logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - http { - server { - request-timeout = 300 s - preview { - enable-http2 = on - } - idle-timeout = 3600 s - socket-options { - tcp-keep-alive = true - } - } - client { - idle-timeout = 300 s - connecting-timeout = 60 s - parsing { - max-content-length = infinite - } - - } - host-connection-pool { - idle-timeout = 300 s - max-retries = 1 - max-connections = 2048 - min-connections = 5 - max-open-requests = 2048 - response-entity-subscription-timeout = 30.minute - } - parsing { - max-to-strict-bytes = 32m - } - } - } - -akka.http.client.response-entity-subscription-timeout = 30.minute \ No newline at end of file +# akka { +# log-dead-letters = off +# jvm-exit-on-fatal-error = off +# loggers = [akka.event.slf4j.Slf4jLogger] +# loglevel = DEBUG +# logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" +# http { +# server { +# request-timeout = 300 s +# preview { +# enable-http2 = on +# } +# idle-timeout = 3600 s +# socket-options { +# tcp-keep-alive = true +# } +# } +# client { +# idle-timeout = 300 s +# connecting-timeout = 60 s +# parsing { +# max-content-length = infinite +# } +# +# } +# host-connection-pool { +# idle-timeout = 300 s +# max-retries = 1 +# max-connections = 2048 +# min-connections = 5 +# max-open-requests = 2048 +# response-entity-subscription-timeout = 30.minute +# } +# parsing { +# max-to-strict-bytes = 32m +# } +# } +# } +# +# akka.http.client.response-entity-subscription-timeout = 30.minute \ No newline at end of file diff --git a/src/main/scala/com/example/DocumentRoute.scala b/src/main/scala/com/example/DocumentRoute.scala index d9e4137..3d28509 100644 --- a/src/main/scala/com/example/DocumentRoute.scala +++ b/src/main/scala/com/example/DocumentRoute.scala @@ -11,9 +11,10 @@ import akka.http.scaladsl.server.{RequestContext, Route, RouteResult} import akka.util.Timeout import com.example.Constant.fileContent +import java.time.LocalDateTime import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, blocking} //#import-json-formats //#user-routes-class @@ -27,7 +28,6 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) { // If ask takes more time than this to complete the request is failed private implicit val timeout = Timeout.create(system.settings.config.getDuration("my-app.routes.ask-timeout")) implicit val ec = ExecutionContext.global - var index = 0 val getPDF: Future[String] = { @@ -45,15 +45,17 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) { pathEnd { concat( get { - index = index + 1 - println("Sending pdf " + index) - complete(getPDF) + parameters('index.?) { index: Option[String] => + onSuccess(slowOp) { + println(s"[${LocalDateTime.now}] --> Sending pdf ${index.getOrElse("")}") + complete(getPDF) + } + } }, post { entity(as[String]) { pdf => onSuccess(getPDF) { performed => { - index = index + 1 - println("Sending pdf " + index) + println("Creating pdf " + performed) complete((StatusCodes.Created, performed)) } } @@ -63,5 +65,10 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) { } //#users-get-delete + def slowOp: Future[Unit] = Future { + blocking { + Thread.sleep(5000) + } + } //#all-routes } diff --git a/src/main/scala/com/example/DownloadPDF.scala b/src/main/scala/com/example/DownloadPDF.scala index 936ab1d..5261f4e 100644 --- a/src/main/scala/com/example/DownloadPDF.scala +++ b/src/main/scala/com/example/DownloadPDF.scala @@ -9,8 +9,10 @@ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.scaladsl.{Flow, Framing, Sink, Source} import akka.util.ByteString -import scala.concurrent.{Await, ExecutionContext, Future} +import java.time.LocalDateTime +import scala.concurrent.{Await, ExecutionContext, Future, blocking} import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} object DownloadPDF extends App{ @@ -66,4 +68,23 @@ object DownloadPDF extends App{ response.entity.dataBytes .runReduce(_ ++ _).map(_.utf8String) } + + + val pool= Http().superPool[Int]() + Source(1 to 1000) + .map(i => (HttpRequest(uri = s"http://localhost:8080/pdf?index=$i", method = HttpMethods.GET), i)) + .via(pool) + .mapAsync(2)(asyncOp).runWith(Sink.foreach { + case (Success(r), i) => println(s"[${LocalDateTime.now}] $i succeeded") + case (Failure(e), i) => println(s"[${LocalDateTime.now}] $i failed: $e") + }) + + def asyncOp(result: (Try[HttpResponse], Int)): Future[(Try[HttpResponse], Int)] = + Future { + blocking { + Thread.sleep(100) // simulate work + result._1.get.discardEntityBytes() + result + } + } } diff --git a/src/main/scala/com/example/StartHttpServer.scala b/src/main/scala/com/example/StartHttpServer.scala index 0071e19..7c74b42 100644 --- a/src/main/scala/com/example/StartHttpServer.scala +++ b/src/main/scala/com/example/StartHttpServer.scala @@ -44,7 +44,7 @@ object StartHttpServer extends App { //12670131 val keyerFunction: PartialFunction[RequestContext, Uri] = { - case r: RequestContext => r.request.uri + case r: RequestContext => r.request.uri.withRawQueryString("") } val defaultCachingSettings = CachingSettings(context.system)