-
Couldn't load subscription status.
- Fork 77
Description
Context
We use Anorm with PekkoStream (previously AkkaStream) in a Play Framework application. Our usage is pretty standard with the exception that the requests we do with Anorm are to a database that can take several seconds (or even more sometimes) to start answering results.
In this situation, we noticed that all "default" threads can become blocked and thus the Play application becomes irresponsive to any request (including a healthcheck request that is absolutely unrelated and just answers OK without doing anything else).
Details
Our usage
Just to highlight that we're not doing anything custom, minimized code:
val source: Source[OurDataModel, NotUsed] = PekkoStream.source(SQL("SELECT * FROM ..."), rowParser)
val jsonSource: Source[ByteString, NotUsed] = source.map(row => transformRowToJson(row))
Ok.chunked(jsonSource) // Play ResultsObservations
A thread dump during a period where the application is irresponsive shows threads being waiting/hanging in the preStart of the stream:
....
at app//anorm.Cursor$.apply(Cursor.scala:31)
at app//anorm.Sql$.unsafeCursor(Anorm.scala:248)
at app//anorm.PekkoStream$ResultSource$$anon$1.nextCursor(PekkoStream.scala:149)
at app//anorm.PekkoStream$ResultSource$$anon$1.preStart(PekkoStream.scala:126)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:317)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
...
at app//org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:729)
at app//org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at app//org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at app//org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
at app//org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
at app//org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
at java.base@21.0.5/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
at java.base@21.0.5/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Relevant code in Anorm:
anorm/pekko/src/main/scala/anorm/PekkoStream.scala
Lines 123 to 130 in 7501f26
| override def preStart(): Unit = { | |
| try { | |
| resultSet = sql.unsafeResultSet(connection) | |
| nextCursor() | |
| } catch { | |
| case NonFatal(cause) => failWith(cause) | |
| } | |
| } |
Workaround
We've been able to fix the issue on our side by explicitly wrapping the preStart code in a scala.concurrent.blocking block.
override def preStart(): Unit = {
+ blocking {
try {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
} catch {
case NonFatal(cause) => failWith(cause)
}
+ }
}(This requires that we "fork" this class in our own code.)
This has for consequences that the thread pool of Pekko actors can grow to still be able to accept non-blocking code. The threads hanging for the database are not blocking anymore other requests.
Now the question is: should it be the default behavior? As Anorm and JDBC drivers are blocking by default, shouldn't the preStart be robust so that if it blocks, it doesn't affect the Pekko actors threads? Would there be any downside in wrapping this code in blocking even if it isn't actually blocking?
If it makes sense to you, I'll be happy to open the PR :)