Skip to content

Commit 0ab5ca9

Browse files
authored
Merge pull request #1157 from matthughes/statementCacheFix
Fixes memory leak in prepared statement cache. Fixes #1143
2 parents 7775504 + c8e2f32 commit 0ab5ca9

File tree

11 files changed

+246
-47
lines changed

11 files changed

+246
-47
lines changed

modules/core/shared/src/main/scala/Session.scala

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -218,26 +218,22 @@ trait Session[F[_]] {
218218
/**
219219
* Resource that prepares a query, yielding a `PreparedQuery` which can be executed multiple
220220
* times with different arguments.
221-
*
222-
* Note: this method only exists to ease migration from Skunk 0.3 and prior. Use the
223-
* non-resource variant instead.
224221
*
222+
* The prepared query is not cached and is closed upon resource cleanup.
223+
*
225224
* @group Queries
226225
*/
227-
def prepareR[A, B](query: Query[A, B]): Resource[F, PreparedQuery[F, A, B]] =
228-
Resource.eval(prepare(query))
226+
def prepareR[A, B](query: Query[A, B]): Resource[F, PreparedQuery[F, A, B]]
229227

230228
/**
231229
* Prepare an `INSERT`, `UPDATE`, or `DELETE` command that returns no rows. The resulting
232230
* `PreparedCommand` can be executed multiple times with different arguments.
233-
*
234-
* Note: this method only exists to ease migration from Skunk 0.3 and prior. Use the
235-
* non-resource variant instead.
236231
*
232+
* The prepared command is not cached and is closed upon resource cleanup.
233+
*
237234
* @group Commands
238235
*/
239-
def prepareR[A](command: Command[A]): Resource[F, PreparedCommand[F, A]] =
240-
Resource.eval(prepare(command))
236+
def prepareR[A](command: Command[A]): Resource[F, PreparedCommand[F, A]]
241237

242238
/**
243239
* Transform a `Command` into a `Pipe` from inputs to `Completion`s.
@@ -300,6 +296,10 @@ trait Session[F[_]] {
300296
*/
301297
def parseCache: Parse.Cache[F]
302298

299+
/**
300+
* Send a Close to server for each prepared statement that has been evicted.
301+
*/
302+
def closeEvictedPreparedStatements: F[Unit]
303303
}
304304

305305

@@ -313,9 +313,11 @@ object Session {
313313
abstract class Impl[F[_]: MonadCancelThrow] extends Session[F] {
314314

315315
override def execute[A, B](query: Query[A, B])(args: A): F[List[B]] =
316-
Monad[F].flatMap(prepare(query))(_.cursor(args).use {
317-
_.fetch(Int.MaxValue).map { case (rows, _) => rows }
318-
})
316+
Monad[F].flatMap(prepare(query)) { pq =>
317+
pq.cursor(args).use {
318+
_.fetch(Int.MaxValue).map { case (rows, _) => rows }
319+
}
320+
}
319321

320322
override def unique[A, B](query: Query[A, B])(args: A): F[B] =
321323
Monad[F].flatMap(prepare(query))(_.unique(args))
@@ -359,15 +361,21 @@ object Session {
359361
* isn't running arbitrary statements then `minimal` might be more efficient.
360362
*/
361363
def full[F[_]: Monad]: Recycler[F, Session[F]] =
362-
ensureIdle[F] <+> unlistenAll <+> resetAll
364+
closeEvictedPreparedStatements[F] <+> ensureIdle[F] <+> unlistenAll <+> resetAll
363365

364366
/**
365367
* Ensure the session is idle, then run a trivial query to ensure the connection is in working
366368
* order. In most cases this check is sufficient.
367369
*/
368370
def minimal[F[_]: Monad]: Recycler[F, Session[F]] =
369-
ensureIdle[F] <+> Recycler(_.unique(Query("VALUES (true)", Origin.unknown, Void.codec, bool)))
371+
closeEvictedPreparedStatements[F] <+> ensureIdle[F] <+> Recycler(_.unique(Query("VALUES (true)", Origin.unknown, Void.codec, bool)))
370372

373+
/**
374+
* Send a Close to server for each prepared statement that was evicted during this session.
375+
*/
376+
def closeEvictedPreparedStatements[F[_]: Monad]: Recycler[F, Session[F]] =
377+
Recycler(_.closeEvictedPreparedStatements.as(true))
378+
371379
/**
372380
* Yield `true` the session is idle (i.e., that there is no ongoing transaction), otherwise
373381
* yield false. This check does not require network IO.
@@ -382,7 +390,6 @@ object Session {
382390
/** Reset all variables to system defaults and yield `true`. */
383391
def resetAll[F[_]: Functor]: Recycler[F, Session[F]] =
384392
Recycler(_.execute(Command("RESET ALL", Origin.unknown, Void.codec)).as(true))
385-
386393
}
387394

388395
/**
@@ -667,6 +674,12 @@ object Session {
667674
override def prepare[A](command: Command[A]): F[PreparedCommand[F, A]] =
668675
proto.prepare(command, typer).map(PreparedCommand.fromProto(_))
669676

677+
override def prepareR[A, B](query: Query[A, B]): Resource[F, PreparedQuery[F, A, B]] =
678+
proto.prepareR(query, typer).map(PreparedQuery.fromProto(_, redactionStrategy))
679+
680+
override def prepareR[A](command: Command[A]): Resource[F, PreparedCommand[F, A]] =
681+
proto.prepareR(command, typer).map(PreparedCommand.fromProto(_))
682+
670683
override def transaction[A]: Resource[F, Transaction[F]] =
671684
Transaction.fromSession(this, namer, none, none)
672685

@@ -679,6 +692,8 @@ object Session {
679692
override def parseCache: Parse.Cache[F] =
680693
proto.parseCache
681694

695+
override def closeEvictedPreparedStatements: F[Unit] =
696+
proto.closeEvictedPreparedStatements
682697
}
683698
}
684699
}
@@ -724,6 +739,10 @@ object Session {
724739

725740
override def prepare[A](command: Command[A]): G[PreparedCommand[G,A]] = fk(outer.prepare(command)).map(_.mapK(fk))
726741

742+
override def prepareR[A, B](query: Query[A, B]): Resource[G, PreparedQuery[G, A, B]] = outer.prepareR(query).mapK(fk).map(_.mapK(fk))
743+
744+
override def prepareR[A](command: Command[A]): Resource[G, PreparedCommand[G, A]] = outer.prepareR(command).mapK(fk).map(_.mapK(fk))
745+
727746
override def transaction[A]: Resource[G,Transaction[G]] = outer.transaction.mapK(fk).map(_.mapK(fk))
728747

729748
override def transaction[A](isolationLevel: TransactionIsolationLevel, accessMode: TransactionAccessMode): Resource[G,Transaction[G]] =
@@ -737,6 +756,7 @@ object Session {
737756

738757
override def parseCache: Parse.Cache[G] = outer.parseCache.mapK(fk)
739758

759+
override def closeEvictedPreparedStatements: G[Unit] = fk(outer.closeEvictedPreparedStatements)
740760
}
741761
}
742762

modules/core/shared/src/main/scala/data/SemispaceCache.scala

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ import cats.syntax.all._
1010
* Cache based on a two-generation GC.
1111
* Taken from https://twitter.com/pchiusano/status/1260255494519865346
1212
*/
13-
sealed abstract case class SemispaceCache[K, V](gen0: Map[K, V], gen1: Map[K, V], max: Int) {
13+
sealed abstract case class SemispaceCache[K, V](gen0: Map[K, V], gen1: Map[K, V], max: Int, evicted: SemispaceCache.EvictionSet[V]) {
1414

1515
assert(max >= 0)
1616
assert(gen0.size <= max)
1717
assert(gen1.size <= max)
1818

1919
def insert(k: K, v: V): SemispaceCache[K, V] =
20-
if (max == 0) this // special case, can't insert!
21-
else if (gen0.size < max) SemispaceCache(gen0 + (k -> v), gen1, max) // room in gen0, done!
22-
else SemispaceCache(Map(k -> v), gen0, max) // no room in gen0, slide it down
20+
if (max == 0) SemispaceCache(gen0, gen1, max, evicted + v) // immediately evict
21+
else if (gen0.size < max) SemispaceCache(gen0 + (k -> v), gen1, max, evicted - v) // room in gen0, done!
22+
else SemispaceCache(Map(k -> v), gen0, max, evicted ++ gen1.values - v)// no room in gen0, slide it down
2323

2424
def lookup(k: K): Option[(SemispaceCache[K, V], V)] =
2525
gen0.get(k).tupleLeft(this) orElse // key is in gen0, done!
@@ -30,14 +30,48 @@ sealed abstract case class SemispaceCache[K, V](gen0: Map[K, V], gen1: Map[K, V]
3030

3131
def values: List[V] =
3232
(gen0.values.toSet | gen1.values.toSet).toList
33+
34+
def evictAll: SemispaceCache[K, V] =
35+
SemispaceCache(Map.empty, Map.empty, max, evicted ++ gen0.values ++ gen1.values)
36+
37+
def clearEvicted: (SemispaceCache[K, V], List[V]) =
38+
(SemispaceCache(gen0, gen1, max, evicted.clear), evicted.toList)
3339
}
3440

3541
object SemispaceCache {
3642

37-
private def apply[K, V](gen0: Map[K, V], gen1: Map[K, V], max: Int): SemispaceCache[K, V] =
38-
new SemispaceCache[K, V](gen0, gen1, max) {}
43+
private def apply[K, V](gen0: Map[K, V], gen1: Map[K, V], max: Int, evicted: EvictionSet[V]): SemispaceCache[K, V] =
44+
new SemispaceCache[K, V](gen0, gen1, max, evicted) {}
45+
46+
def empty[K, V](max: Int, trackEviction: Boolean): SemispaceCache[K, V] =
47+
SemispaceCache[K, V](Map.empty, Map.empty, max max 0, if (trackEviction) EvictionSet.empty else new EvictionSet.ZeroEvictionSet)
48+
49+
sealed trait EvictionSet[V] {
50+
def +(v: V): EvictionSet[V]
51+
def ++(vs: Iterable[V]): EvictionSet[V]
52+
def -(v: V): EvictionSet[V]
53+
def toList: List[V]
54+
def clear: EvictionSet[V]
55+
}
56+
57+
private[SemispaceCache] object EvictionSet {
58+
59+
class ZeroEvictionSet[V] extends EvictionSet[V] {
60+
def +(v: V): EvictionSet[V] = this
61+
def ++(vs: Iterable[V]): EvictionSet[V] = this
62+
def -(v: V): EvictionSet[V] = this
63+
def toList: List[V] = Nil
64+
def clear: EvictionSet[V] = this
65+
}
3966

40-
def empty[K, V](max: Int): SemispaceCache[K, V] =
41-
SemispaceCache[K, V](Map.empty, Map.empty, max max 0)
67+
class DefaultEvictionSet[V](underlying: Set[V]) extends EvictionSet[V] {
68+
def +(v: V): EvictionSet[V] = new DefaultEvictionSet(underlying + v)
69+
def ++(vs: Iterable[V]): EvictionSet[V] = new DefaultEvictionSet(underlying ++ vs)
70+
def -(v: V): EvictionSet[V] = new DefaultEvictionSet(underlying - v)
71+
def toList: List[V] = underlying.toList
72+
def clear: EvictionSet[V] = new DefaultEvictionSet(Set.empty)
73+
}
4274

75+
def empty[V]: EvictionSet[V] = new DefaultEvictionSet(Set.empty)
76+
}
4377
}

modules/core/shared/src/main/scala/net/Protocol.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ trait Protocol[F[_]] {
7373
*/
7474
def prepare[A, B](query: Query[A, B], ty: Typer): F[Protocol.PreparedQuery[F, A, B]]
7575

76+
/** Like [[prepare]] but does not cache the result and closes the command upon resource cleanup. */
77+
def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]]
78+
79+
/** Like [[prepare]] but does not cache the result and closes the query upon resource cleanup. */
80+
def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]]
81+
7682
/**
7783
* Execute a non-parameterized command (a statement that produces no rows), yielding a
7884
* `Completion`. This is equivalent to `prepare` + `bind` + `execute` but it uses the "simple"
@@ -103,7 +109,7 @@ trait Protocol[F[_]] {
103109
* Cleanup the session. This will close any cached prepared statements.
104110
*/
105111
def cleanup: F[Unit]
106-
112+
107113
/**
108114
* Signal representing the current transaction status as reported by `ReadyForQuery`. It's not
109115
* clear that this is a useful thing to expose.
@@ -115,6 +121,9 @@ trait Protocol[F[_]] {
115121

116122
/** Cache for the `Parse` protocol. */
117123
def parseCache: Parse.Cache[F]
124+
125+
/** Closes any prepared statements that have been evicted from the parse cache. */
126+
def closeEvictedPreparedStatements: F[Unit]
118127
}
119128

120129
object Protocol {
@@ -247,6 +256,20 @@ object Protocol {
247256
override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] =
248257
protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(query, ty)
249258

259+
override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = {
260+
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
261+
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(command, ty)
262+
}
263+
Resource.make(acquire)(pc => protocol.Close[F].apply(pc.id))
264+
}
265+
266+
override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = {
267+
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
268+
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(query, ty)
269+
}
270+
Resource.make(acquire)(pq => protocol.Close[F].apply(pq.id))
271+
}
272+
250273
override def execute(command: Command[Void]): F[Completion] =
251274
protocol.Query[F](redactionStrategy).apply(command)
252275

@@ -259,8 +282,8 @@ object Protocol {
259282
protocol.Startup[F].apply(user, database, password, parameters)
260283

261284
override def cleanup: F[Unit] =
262-
parseCache.value.values.flatMap(xs => xs.traverse_(protocol.Close[F].apply))
263-
285+
parseCache.value.values.flatMap(_.traverse_(protocol.Close[F].apply))
286+
264287
override def transactionStatus: Signal[F, TransactionStatus] =
265288
bms.transactionStatus
266289

@@ -270,6 +293,8 @@ object Protocol {
270293
override val parseCache: Parse.Cache[F] =
271294
pc
272295

296+
override def closeEvictedPreparedStatements: F[Unit] =
297+
pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F].apply))
273298
}
274299
}
275300

modules/core/shared/src/main/scala/net/protocol/Close.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,5 @@ object Close {
4040
_ <- send(Flush)
4141
_ <- expect { case CloseComplete => }
4242
} yield ()
43-
4443
}
45-
4644
}

modules/core/shared/src/main/scala/net/protocol/Describe.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ object Describe {
2626
commandCapacity: Int,
2727
queryCapacity: Int,
2828
): F[Cache[F]] = (
29-
StatementCache.empty[F, Unit](commandCapacity),
30-
StatementCache.empty[F, TypedRowDescription](queryCapacity)
29+
StatementCache.empty[F, Unit](commandCapacity, false),
30+
StatementCache.empty[F, TypedRowDescription](queryCapacity, false)
3131
).mapN(Describe.Cache(_, _))
3232
}
3333

modules/core/shared/src/main/scala/net/protocol/Parse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ object Parse {
2020

2121
object Cache {
2222
def empty[F[_]: Functor: Ref.Make](capacity: Int): F[Cache[F]] =
23-
StatementCache.empty[F, StatementId](capacity).map(Parse.Cache(_))
23+
StatementCache.empty[F, StatementId](capacity, true).map(Parse.Cache(_))
2424
}
2525

2626
}

modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ object ParseDescribe {
5151
TooManyParametersException(stmt).raiseError[F, (F[StatementId], StatementId => F[Unit])]
5252

5353
case Right(os) =>
54+
5455
OptionT(parseCache.value.get(stmt)).map(id => (id.pure, (_:StatementId) => ().pure)).getOrElse {
5556
val pre = for {
5657
id <- nextName("statement").map(StatementId(_))
@@ -109,7 +110,7 @@ object ParseDescribe {
109110
}
110111

111112
exchange("parse+describe") { (span: Span[F]) =>
112-
parseExchange(cmd, ty)(span).flatMap{ case (preParse, postParse) =>
113+
parseExchange(cmd, ty)(span).flatMap { case (preParse, postParse) =>
113114
describeExchange(span).flatMap { case (preDesc, postDesc) =>
114115
for {
115116
id <- preParse
@@ -155,7 +156,7 @@ object ParseDescribe {
155156

156157

157158
exchange("parse+describe") { (span: Span[F]) =>
158-
parseExchange(query, ty)(span).flatMap{ case (preParse, postParse) =>
159+
parseExchange(query, ty)(span).flatMap { case (preParse, postParse) =>
159160
describeExchange(span).flatMap { case (preDesc, postDesc) =>
160161
for {
161162
id <- preParse

modules/core/shared/src/main/scala/util/StatementCache.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ sealed trait StatementCache[F[_], V] { outer =>
1818
def containsKey(k: Statement[_]): F[Boolean]
1919
def clear: F[Unit]
2020
def values: F[List[V]]
21+
private[skunk] def clearEvicted: F[List[V]]
2122

2223
def mapK[G[_]](fk: F ~> G): StatementCache[G, V] =
2324
new StatementCache[G, V] {
@@ -26,36 +27,39 @@ sealed trait StatementCache[F[_], V] { outer =>
2627
def containsKey(k: Statement[_]): G[Boolean] = fk(outer.containsKey(k))
2728
def clear: G[Unit] = fk(outer.clear)
2829
def values: G[List[V]] = fk(outer.values)
30+
def clearEvicted: G[List[V]] = fk(outer.clearEvicted)
2931
}
3032

3133
}
3234

3335
object StatementCache {
3436

35-
def empty[F[_]: Functor: Ref.Make, V](max: Int): F[StatementCache[F, V]] =
36-
Ref[F].of(SemispaceCache.empty[Statement.CacheKey, V](max)).map { ref =>
37+
def empty[F[_]: Functor: Ref.Make, V](max: Int, trackEviction: Boolean): F[StatementCache[F, V]] =
38+
Ref[F].of(SemispaceCache.empty[Statement.CacheKey, V](max, trackEviction)).map { ref =>
3739
new StatementCache[F, V] {
3840

3941
def get(k: Statement[_]): F[Option[V]] =
4042
ref.modify { c =>
4143
c.lookup(k.cacheKey) match {
42-
case Some((cʹ, v)) => (cʹ, Some(v))
43-
case None => (c, None)
44+
case Some((cʹ, v)) => (cʹ, Some(v))
45+
case None => (c, None)
4446
}
4547
}
4648

47-
private[skunk] def put(k: Statement[_], v: V): F[Unit] =
49+
def put(k: Statement[_], v: V): F[Unit] =
4850
ref.update(_.insert(k.cacheKey, v))
4951

5052
def containsKey(k: Statement[_]): F[Boolean] =
5153
ref.get.map(_.containsKey(k.cacheKey))
5254

5355
def clear: F[Unit] =
54-
ref.set(SemispaceCache.empty[Statement.CacheKey, V](max))
56+
ref.update(_.evictAll)
5557

5658
def values: F[List[V]] =
5759
ref.get.map(_.values)
60+
61+
def clearEvicted: F[List[V]] =
62+
ref.modify(_.clearEvicted)
5863
}
5964
}
60-
6165
}

0 commit comments

Comments
 (0)