Skip to content

Commit 160c1c6

Browse files
authored
Merge pull request #4267 from morgen-peschke/add-nonempty-version-of-hotswap
Add NonEmptyHotswap
2 parents 6e62fc6 + 9a32ef0 commit 160c1c6

File tree

5 files changed

+376
-76
lines changed

5 files changed

+376
-76
lines changed

docs/std/hotswap.md

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,33 @@ Constructing a new [`Resource`](./resource.md) inside the body of a
1010
until after the inner resource is released. Consider for example writing a
1111
logger that will rotate log files every `n` bytes.
1212

13-
## Hotswap
13+
## NonEmptyHotswap
1414

15-
`Hotswap` addresses this by exposing a linear sequence of resources as a single
15+
`NonEmptyHotswap` addresses this by exposing a linear sequence of resources as a single
1616
`Resource`. We can run the finalizers for the current resource and advance to
17-
the next one in the sequence using `Hotswap#swap`. An error may be raised if
17+
the next one in the sequence using `NonEmptyHotswap#swap`. An error may be raised if
1818
the previous resource in the sequence is referenced after `swap` is invoked
1919
(as the resource will have been finalized).
2020

2121
```scala
22-
sealed trait Hotswap[F[_], R] {
23-
24-
def swap(next: Resource[F, R]): F[R]
22+
sealed trait NonEmptyHotswap[F[_], R] {
23+
def swap(next: Resource[F, R]): F[Unit]
24+
def get: Resource[F, R]
2525
}
2626
```
2727

2828
A rotating logger would then look something like this:
2929

3030
```scala
31-
def rotating(n: Int): Resource[IO, Logger[IO]] =
32-
Hotswap.create[IO, File].flatMap { hs =>
33-
def file(name: String): Resource[IO, File] = ???
34-
def write(file: File, msg: String): IO[Unit] = ???
31+
def rotating(n: Int): Resource[IO, Logger[IO]] = {
32+
def file(name: String): Resource[IO, File] = ???
33+
def write(file: File, msg: String): IO[Unit] = ???
3534

35+
NonEmptyHotswap[IO, File](file("0.log").flatMap { hs =>
3636
Resource.eval {
3737
for {
3838
index <- Ref[IO].of(0)
3939
count <- Ref[IO].of(0)
40-
// Open the initial log file
41-
_ <- hs.swap(file("0.log"))
4240
} yield new Logger[IO] {
4341
def log(msg: String): IO[Unit] =
4442
count.get.flatMap { currentCount =>
@@ -61,4 +59,5 @@ def rotating(n: Int): Resource[IO, Logger[IO]] =
6159
}
6260
}
6361
}
64-
```
62+
}
63+
```

std/shared/src/main/scala/cats/effect/std/Hotswap.scala

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package cats.effect.std
1818

19-
import cats.effect.kernel.{Concurrent, Ref, Resource}
20-
import cats.effect.kernel.syntax.all._
19+
import cats.effect.kernel.{Concurrent, Resource}
2120
import cats.syntax.all._
2221

2322
/**
@@ -51,6 +50,7 @@ import cats.syntax.all._
5150
*
5251
* Ported from https://github.com/typelevel/fs2.
5352
*/
53+
@deprecated("Use NonEmptyHotswap", "3.7.0")
5454
sealed trait Hotswap[F[_], R] {
5555

5656
/**
@@ -94,78 +94,27 @@ object Hotswap {
9494
* Creates a new [[Hotswap]] initialized with the specified resource. The [[Hotswap]] instance
9595
* and the initial resource are returned.
9696
*/
97+
@deprecated("Use NonEmptyHotswap.apply", "3.7.0")
9798
def apply[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (Hotswap[F, R], R)] =
9899
create[F, R].evalMap(hotswap => hotswap.swap(initial).tupleLeft(hotswap))
99100

100101
/**
101102
* Creates a new [[Hotswap]], which represents a [[cats.effect.kernel.Resource]] that can be
102103
* swapped during the lifetime of this [[Hotswap]].
103104
*/
105+
@deprecated("Use NonEmptyHotswap.empty", "3.7.0")
104106
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] =
105-
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
106-
sealed abstract class State
107-
case object Cleared extends State
108-
case class Acquired(r: R, fin: F[Unit]) extends State
109-
case object Finalized extends State
110-
111-
def initialize: F[Ref[F, State]] =
112-
F.ref(Cleared)
113-
114-
def finalize(state: Ref[F, State]): F[Unit] =
115-
state.getAndSet(Finalized).flatMap {
116-
case Acquired(_, finalizer) => exclusive.surround(finalizer)
117-
case Cleared => F.unit
118-
case Finalized => raise("Hotswap already finalized")
107+
NonEmptyHotswap.empty[F, R].map { nes =>
108+
new Hotswap[F, R] {
109+
override def swap(next: Resource[F, R]): F[R] = {
110+
// Warning: this leaks the contents of the Resource.
111+
// This is done intentionally to satisfy the mistakes of the old API
112+
nes.swap(next.map(_.some)) *> get.use(_.get.pure[F])
119113
}
120114

121-
def raise(message: String): F[Unit] =
122-
F.raiseError[Unit](new RuntimeException(message))
123-
124-
def exclusive: Resource[F, Unit] =
125-
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
126-
semaphore.releaseN(Long.MaxValue))
127-
128-
Resource.make(initialize)(finalize).map { state =>
129-
new Hotswap[F, R] {
130-
131-
override def swap(next: Resource[F, R]): F[R] =
132-
F.uncancelable { poll =>
133-
poll(next.allocated).flatMap {
134-
case (r, fin) =>
135-
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
136-
swapFinalizer(Acquired(r, fin)).as(r)
137-
}
138-
}
139-
}
115+
override def get: Resource[F, Option[R]] = nes.getOpt
140116

141-
override def get: Resource[F, Option[R]] =
142-
Resource.makeFull[F, Option[R]] { poll =>
143-
poll(semaphore.acquire) *> // acquire shared lock
144-
state.get.flatMap {
145-
case Acquired(r, _) => F.pure(Some(r))
146-
case _ => semaphore.release.as(None)
147-
}
148-
} { r => if (r.isDefined) semaphore.release else F.unit }
149-
150-
override def clear: F[Unit] =
151-
exclusive.surround(swapFinalizer(Cleared).uncancelable)
152-
153-
private def swapFinalizer(next: State): F[Unit] =
154-
state.modify {
155-
case Acquired(_, fin) =>
156-
next -> fin
157-
case Cleared =>
158-
next -> F.unit
159-
case Finalized =>
160-
val fin = next match {
161-
case Acquired(_, fin) => fin
162-
case _ => F.unit
163-
}
164-
Finalized -> (fin *> raise("Cannot swap after finalization"))
165-
}.flatten
166-
167-
}
117+
override def clear: F[Unit] = nes.clear
168118
}
169119
}
170-
171120
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2020-2025 Typelevel
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package cats.effect.std
18+
19+
import cats.effect.kernel.{Concurrent, MonadCancel, Ref, Resource}
20+
import cats.effect.kernel.Resource.ExitCase.Succeeded
21+
import cats.syntax.all._
22+
23+
/**
24+
* A concurrent data structure that exposes a linear sequence of `R` resources as a single
25+
* [[cats.effect.kernel.Resource]] in `F` without accumulation.
26+
*
27+
* A [[NonEmptyHotswap]] is allocated within a [[cats.effect.kernel.Resource]] that dictates the
28+
* scope of its lifetime. After creation, a `Resource[F, R]` can be swapped in by calling
29+
* [[swap]]. The newly acquired resource is returned and is released either when the
30+
* [[NonEmptyHotswap]] is finalized or upon the next call to [[swap]], whichever occurs first.
31+
*
32+
* The following diagram illustrates the linear allocation and release of three resources `r1`,
33+
* `r2`, and `r3` cycled through [[NonEmptyHotswap]]:
34+
*
35+
* {{{
36+
* create(r1) ----- swap(r2) ---- swap(r3) ---- X
37+
* | | | |
38+
* r1 acquired | | |
39+
* r2 acquired | |
40+
* r1 released r3 acquired |
41+
* r2 released |
42+
* r3 released
43+
* }}}
44+
*
45+
* [[NonEmptyHotswap]] is particularly useful when working with effects that cycle through
46+
* resources, like writing bytes to files or rotating files every N bytes or M seconds. Without
47+
* [[NonEmptyHotswap]], such effects leak resources: on each file rotation, a file handle or
48+
* some internal resource handle accumulates. With [[NonEmptyHotswap]], the only registered
49+
* resource is the [[NonEmptyHotswap]] itself, and each file is swapped in only after swapping
50+
* the previous one out.
51+
*
52+
* Replaces the deprecated [[Hotswap]] with a safer API.
53+
*/
54+
sealed trait NonEmptyHotswap[F[_], R] {
55+
56+
/**
57+
* Allocates a new resource and closes the previous one.
58+
*
59+
* When the lifetime of the [[NonEmptyHotswap]] is completed, the resource allocated by the
60+
* most recent [[swap]] will be finalized.
61+
*
62+
* [[swap]] finalizes the previous resource immediately, so users must ensure that the old `R`
63+
* is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In
64+
* any case, no resources will be leaked.
65+
*
66+
* To access the current resource, use [[get]], which guarantees that it will not be released
67+
* while it is being used.
68+
*
69+
* If [[swap]] is called after the lifetime of the [[NonEmptyHotswap]] is over, it will raise
70+
* an error, but will ensure that all resources are finalized before returning.
71+
*/
72+
def swap(next: Resource[F, R]): F[Unit]
73+
74+
/**
75+
* Gets the current resource. The returned resource is guaranteed to be available for the
76+
* duration of the returned resource.
77+
*/
78+
def get: Resource[F, R]
79+
}
80+
81+
object NonEmptyHotswap {
82+
83+
/**
84+
* Creates a new [[NonEmptyHotswap]] initialized with the specified resource, which represents
85+
* a [[cats.effect.kernel.Resource]] that can be swapped during the lifetime of this
86+
* [[NonEmptyHotswap]].
87+
*/
88+
def apply[F[_], R](initial: Resource[F, R])(
89+
implicit F: Concurrent[F]): Resource[F, NonEmptyHotswap[F, R]] =
90+
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
91+
sealed abstract class State
92+
case class Acquired(r: R, fin: F[Unit]) extends State
93+
case object Finalized extends State
94+
95+
def initialize: F[Ref[F, State]] =
96+
F.uncancelable { poll =>
97+
poll(initial.allocated).flatMap {
98+
case (r, fin) =>
99+
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
100+
F.ref(Acquired(r, fin))
101+
}
102+
}
103+
}
104+
105+
def finalize(state: Ref[F, State]): F[Unit] =
106+
state.getAndSet(Finalized).flatMap {
107+
case Acquired(_, finalizer) => exclusive.surround(finalizer)
108+
case Finalized => raise("NonEmptyHotswap already finalized")
109+
}
110+
111+
def raise[A](message: String): F[A] =
112+
F.raiseError[A](new IllegalStateException(message))
113+
114+
def exclusive: Resource[F, Unit] =
115+
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
116+
semaphore.releaseN(Long.MaxValue))
117+
118+
Resource.make(initialize)(finalize).map { state =>
119+
new NonEmptyHotswap[F, R] {
120+
121+
override def swap(next: Resource[F, R]): F[Unit] =
122+
F.uncancelable { poll =>
123+
poll(next.allocated).flatMap {
124+
case (r, fin) =>
125+
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
126+
swapFinalizer(Acquired(r, fin))
127+
}
128+
}
129+
}
130+
131+
override def get: Resource[F, R] =
132+
Resource.makeFull[F, R] { poll =>
133+
poll(semaphore.acquire) *> // acquire shared lock
134+
state.get.flatMap {
135+
case Acquired(r, _) => F.pure(r)
136+
case _ => raise("Hotswap already finalized")
137+
}
138+
}(_ => semaphore.release)
139+
140+
private def swapFinalizer(next: State): F[Unit] =
141+
state.flatModify {
142+
case Acquired(_, fin) =>
143+
next -> fin
144+
case Finalized =>
145+
val fin = next match {
146+
case Acquired(_, fin) => fin
147+
case _ => F.unit
148+
}
149+
Finalized -> (fin *> raise[Unit]("Cannot swap after finalization"))
150+
}
151+
}
152+
}
153+
}
154+
155+
/**
156+
* Creates a [[NonEmptyHotswap]] of `Resource[F, Option[R]]` containing a `None`
157+
*/
158+
def empty[F[_], R](implicit F: Concurrent[F]): Resource[F, NonEmptyHotswap[F, Option[R]]] =
159+
apply[F, Option[R]](Resource.pure(none))
160+
161+
implicit final class NonEmptyHotswapOptionalResourcesOpt[F[_], R](
162+
private val hs: NonEmptyHotswap[F, Option[R]])
163+
extends AnyVal {
164+
165+
/**
166+
* When the [[cats.effect.kernel.Resource]] contained by a [[NonEmptyHotswap]] is wrapped in
167+
* an [[scala.Option]] it is not desirable to prevent calls to [[NonEmptyHotswap.swap]] when
168+
* the [[cats.effect.kernel.Resource Resource]] contains [[scala.None]].
169+
*
170+
* [[getOpt]] preserves this behavior from [[Hotswap.get]]
171+
*/
172+
def getOpt(implicit F: MonadCancel[F, Throwable]): Resource[F, Option[R]] =
173+
Resource.applyFull[F, Option[R]] { poll =>
174+
poll(hs.get.allocatedCase).flatMap {
175+
case (None, fin) => fin(Succeeded) *> F.pure((None, _ => F.unit))
176+
case (r, fin) => F.pure((r, fin))
177+
}
178+
}
179+
180+
def clear: F[Unit] = hs.swap(Resource.pure(none))
181+
}
182+
}

tests/shared/src/test/scala/cats/effect/std/HotswapSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import cats.effect.unsafe.IORuntimeConfig
2525

2626
import scala.concurrent.duration._
2727

28+
@deprecated("Hotswap deprecated", "3.7.0")
2829
class HotswapSuite extends BaseSuite { outer =>
2930

3031
def logged(log: Ref[IO, List[String]], name: String): Resource[IO, Unit] =

0 commit comments

Comments
 (0)