Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 40 additions & 40 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,43 @@ my-app {



akka {
log-dead-letters = off
jvm-exit-on-fatal-error = off
loggers = [akka.event.slf4j.Slf4jLogger]
loglevel = DEBUG
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
http {
server {
request-timeout = 300 s
preview {
enable-http2 = on
}
idle-timeout = 3600 s
socket-options {
tcp-keep-alive = true
}
}
client {
idle-timeout = 300 s
connecting-timeout = 60 s
parsing {
max-content-length = infinite
}

}
host-connection-pool {
idle-timeout = 300 s
max-retries = 1
max-connections = 2048
min-connections = 5
max-open-requests = 2048
response-entity-subscription-timeout = 30.minute
}
parsing {
max-to-strict-bytes = 32m
}
}
}

akka.http.client.response-entity-subscription-timeout = 30.minute
# akka {
# log-dead-letters = off
# jvm-exit-on-fatal-error = off
# loggers = [akka.event.slf4j.Slf4jLogger]
# loglevel = DEBUG
# logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
# http {
# server {
# request-timeout = 300 s
# preview {
# enable-http2 = on
# }
# idle-timeout = 3600 s
# socket-options {
# tcp-keep-alive = true
# }
# }
# client {
# idle-timeout = 300 s
# connecting-timeout = 60 s
# parsing {
# max-content-length = infinite
# }
#
# }
# host-connection-pool {
# idle-timeout = 300 s
# max-retries = 1
# max-connections = 2048
# min-connections = 5
# max-open-requests = 2048
# response-entity-subscription-timeout = 30.minute
# }
# parsing {
# max-to-strict-bytes = 32m
# }
# }
# }
#
# akka.http.client.response-entity-subscription-timeout = 30.minute
21 changes: 14 additions & 7 deletions src/main/scala/com/example/DocumentRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import akka.http.scaladsl.server.{RequestContext, Route, RouteResult}
import akka.util.Timeout
import com.example.Constant.fileContent

import java.time.LocalDateTime

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}

//#import-json-formats
//#user-routes-class
Expand All @@ -27,7 +28,6 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) {
// If ask takes more time than this to complete the request is failed
private implicit val timeout = Timeout.create(system.settings.config.getDuration("my-app.routes.ask-timeout"))
implicit val ec = ExecutionContext.global
var index = 0


val getPDF: Future[String] = {
Expand All @@ -45,15 +45,17 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) {
pathEnd {
concat(
get {
index = index + 1
println("Sending pdf " + index)
complete(getPDF)
parameters('index.?) { index: Option[String] =>
onSuccess(slowOp) {
println(s"[${LocalDateTime.now}] --> Sending pdf ${index.getOrElse("")}")
complete(getPDF)
}
}
},
post {
entity(as[String]) { pdf =>
onSuccess(getPDF) { performed => {
index = index + 1
println("Sending pdf " + index)
println("Creating pdf " + performed)
complete((StatusCodes.Created, performed))
}
}
Expand All @@ -63,5 +65,10 @@ class DocumentRoute()(implicit val system: ActorSystem[_]) {
}
//#users-get-delete

def slowOp: Future[Unit] = Future {
blocking {
Thread.sleep(5000)
}
}
//#all-routes
}
23 changes: 22 additions & 1 deletion src/main/scala/com/example/DownloadPDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{Flow, Framing, Sink, Source}
import akka.util.ByteString

import scala.concurrent.{Await, ExecutionContext, Future}
import java.time.LocalDateTime
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object DownloadPDF extends App{

Expand Down Expand Up @@ -66,4 +68,23 @@ object DownloadPDF extends App{
response.entity.dataBytes
.runReduce(_ ++ _).map(_.utf8String)
}


val pool= Http().superPool[Int]()
Source(1 to 1000)
.map(i => (HttpRequest(uri = s"http://localhost:8080/pdf?index=$i", method = HttpMethods.GET), i))
.via(pool)
.mapAsync(2)(asyncOp).runWith(Sink.foreach {
case (Success(r), i) => println(s"[${LocalDateTime.now}] $i succeeded")
case (Failure(e), i) => println(s"[${LocalDateTime.now}] $i failed: $e")
})

def asyncOp(result: (Try[HttpResponse], Int)): Future[(Try[HttpResponse], Int)] =
Future {
blocking {
Thread.sleep(100) // simulate work
result._1.get.discardEntityBytes()
result
}
}
}
2 changes: 1 addition & 1 deletion src/main/scala/com/example/StartHttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object StartHttpServer extends App {
//12670131

val keyerFunction: PartialFunction[RequestContext, Uri] = {
case r: RequestContext => r.request.uri
case r: RequestContext => r.request.uri.withRawQueryString("")
}
val defaultCachingSettings = CachingSettings(context.system)

Expand Down