Skip to content

Add AtomicMap #4424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: series/3.x
Choose a base branch
from
Open
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,12 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.std.Mutex#ConcurrentImpl.EmptyCell"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.std.Mutex#ConcurrentImpl.LockQueueCell")
"cats.effect.std.Mutex#ConcurrentImpl.LockQueueCell"),
// #4424, refactored private classes
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"cats.effect.std.AtomicCell#AsyncImpl.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"cats.effect.std.AtomicCell#ConcurrentImpl.this")
)
)
.jsSettings(
Expand Down
47 changes: 27 additions & 20 deletions docs/std/atomic-cell.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ title: Atomic Cell

A synchronized, concurrent, mutable reference.

Provides safe concurrent access and modification of its contents, by ensuring only one fiber
can operate on them at the time. Thus, all operations except `get` may semantically block the
calling fiber.

```scala mdoc:silent
abstract class AtomicCell[F[_], A] {
def get: F[A]
Expand All @@ -20,34 +16,45 @@ abstract class AtomicCell[F[_], A] {
}
```

Provides safe concurrent access and modification of its contents, by ensuring only one fiber
can operate on them at the time. Thus, all operations except `get` may semantically block the
calling fiber.

## Using `AtomicCell`

The `AtomicCell` can be treated as a combination of `Mutex` and `Ref`:

```scala mdoc:reset:silent
import cats.effect.{IO, Ref}
import cats.effect.std.Mutex
import cats.effect.IO
import cats.effect.std.AtomicCell

trait State
class Service(mtx: Mutex[IO], ref: Ref[IO, State]) {
def modify(f: State => IO[State]): IO[Unit] =
mtx.lock.surround {
for {
current <- ref.get
next <- f(current)
_ <- ref.set(next)
} yield ()
}

class Service(cell: AtomicCell[IO, State]) {
def modify(f: State => IO[State]): IO[Unit] =
cell.evalUpdate(f)
}
```

The following is the equivalent of the example above:
### Example

Imagine a random data generator,
that requires running some effectual operations _(e.g. checking a database)_
to produce a new value.
In that case, it may be better to block than to repeat the operation.

```scala mdoc:reset:silent
import cats.effect.IO
import cats.effect.std.AtomicCell

trait State
class Service(cell: AtomicCell[IO, State]) {
def modify(f: State => IO[State]): IO[Unit] =
cell.evalUpdate(current => f(current))
trait Data

class RandomDataGenerator(cell: AtomicCell[IO, Data]) {
// Generates a new random value.
def next: IO[Data] =
cell.evalUpdateAndGet(generate)

private def generate(previous: Data): IO[Data] =
???
}
```
78 changes: 78 additions & 0 deletions docs/std/atomic-map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
id: atomic-map
title: Atomic Map
---

A total map from `K` to `AtomicCell[F, V]`.

```scala mdoc:silent
import cats.effect.std.AtomicCell

trait AtomicMap[F[_], K, V] {
/**
* Access the AtomicCell for the given key.
*/
def apply(key: K): AtomicCell[F, V]
}
```

It is conceptually similar to a `AtomicMap[F, Map[K, V]]`, but with better ergonomics when
working on a per key basis. Note, however, that it does not support atomic updates to
multiple keys.

Additionally, it also provide less contention: since all operations are performed on
individual key-value pairs, the pairs can be sharded by key. Thus, multiple concurrent
updates may be executed independently to each other, as long as their keys belong to
different shards.

## Using `AtomicMap`

You can think of a `AtomicMap` like a `MapRef` that supports effectual updates by locking the underlying `Ref`.

```scala mdoc:reset:silent
import cats.effect.IO
import cats.effect.std.AtomicMap

trait State
trait Key

class Service(am: AtomicMap[IO, Key, State]) {
def modify(key: Key)(f: State => IO[State]): IO[Unit] =
am(key).evalUpdate(f)
}
```

### Example

Imagine a parking tower,
where users have access to specific floors,
and getting a parking space involves an effectual operation _(e.g. a database call)_.
In that case, it may be better to block than repeat the operation,
but without blocking operations on different floors.

```scala mdoc:reset:silent
import cats.effect.IO
import cats.effect.std.AtomicMap

trait Car
trait Floor
trait ParkingSpace

class ParkingTowerService(state: AtomicMap[IO, Floor, List[ParkingSpace]]) {
// Tries to park the given Car in the solicited Floor.
// Returns either the assigned ParkingSpace, or None if this Floor is full.
def parkCarInFloor(floor: Floor, car: Car): IO[Option[ParkingSpace]] =
state(key = floor).evalModify {
case firstFreeParkingSpace :: remainingParkingSpaces =>
markParkingSpaceAsUsed(parkingSpace = firstFreeParkingSpace, car).as(
remainingParkingSpaces -> Some(firstFreeParkingSpace)
)

case Nil =>
IO.pure(List.empty -> None)
}

private def markParkingSpaceAsUsed(parkingSpace: ParkingSpace, car: Car): IO[Unit] =
???
}
```
163 changes: 127 additions & 36 deletions std/shared/src/main/scala/cats/effect/std/AtomicCell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,35 +153,29 @@ object AtomicCell {
of(M.empty)(F)
}

private[effect] def async[F[_], A](init: A)(implicit F: Async[F]): F[AtomicCell[F, A]] =
Mutex.apply[F].map(mutex => new AsyncImpl(init, mutex))

private[effect] def concurrent[F[_], A](init: A)(
implicit F: Concurrent[F]): F[AtomicCell[F, A]] =
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, m) => new ConcurrentImpl(ref, m) }
private[effect] def async[F[_], A](
init: A
)(
implicit F: Async[F]
): F[AtomicCell[F, A]] =
Mutex.apply[F].map(mutex => new AsyncImpl(init, lock = mutex.lock))

private final class ConcurrentImpl[F[_], A](
ref: Ref[F, A],
mutex: Mutex[F]
private[effect] def concurrent[F[_], A](
init: A
)(
implicit F: Concurrent[F]
) extends AtomicCell[F, A] {
override def get: F[A] = ref.get

override def set(a: A): F[Unit] =
mutex.lock.surround(ref.set(a))
): F[AtomicCell[F, A]] =
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, mutex) =>
new ConcurrentImpl(ref, lock = mutex.lock)
}

// Provides common implementations for derived methods that depend on F being an applicative.
private[effect] sealed abstract class CommonImpl[F[_], A](
implicit F: Applicative[F]
) extends AtomicCell[F, A] {
Comment on lines +172 to +175
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the implementations here would be part of the trait itself.
But since the trait doesn't know that there will be an Applicative[F] in scope, we can't provide them there.


Name suggestions are welcome :)

override def modify[B](f: A => (A, B)): F[B] =
evalModify(a => F.pure(f(a)))

override def evalModify[B](f: A => F[(A, B)]): F[B] =
mutex.lock.surround {
ref.get.flatMap(f).flatMap {
case (a, b) =>
ref.set(a).as(b)
}
}

override def evalUpdate(f: A => F[A]): F[Unit] =
evalModify(a => f(a).map(aa => (aa, ())))

Expand All @@ -192,12 +186,33 @@ object AtomicCell {
evalModify(a => f(a).map(aa => (aa, aa)))
}

private final class AsyncImpl[F[_], A](
private[effect] final class ConcurrentImpl[F[_], A](
ref: Ref[F, A],
lock: Resource[F, Unit]
)(
implicit F: Concurrent[F]
) extends CommonImpl[F, A] {
override def get: F[A] =
ref.get

override def set(a: A): F[Unit] =
lock.surround(ref.set(a))

override def evalModify[B](f: A => F[(A, B)]): F[B] =
lock.surround {
ref.get.flatMap(f).flatMap {
case (a, b) =>
ref.set(a).as(b)
}
}
}

private[effect] final class AsyncImpl[F[_], A](
init: A,
mutex: Mutex[F]
lock: Resource[F, Unit]
)(
implicit F: Async[F]
) extends AtomicCell[F, A] {
) extends CommonImpl[F, A] {
@volatile private var cell: A = init

override def get: F[A] =
Expand All @@ -206,17 +221,14 @@ object AtomicCell {
}

override def set(a: A): F[Unit] =
mutex.lock.surround {
lock.surround {
F.delay {
cell = a
}
}

override def modify[B](f: A => (A, B)): F[B] =
evalModify(a => F.pure(f(a)))

override def evalModify[B](f: A => F[(A, B)]): F[B] =
mutex.lock.surround {
lock.surround {
F.delay(cell).flatMap(f).flatMap {
case (a, b) =>
F.delay {
Expand All @@ -225,14 +237,93 @@ object AtomicCell {
}
}
}
}

override def evalUpdate(f: A => F[A]): F[Unit] =
evalModify(a => f(a).map(aa => (aa, ())))
/**
* Allows seeing a `AtomicCell[F, Option[A]]` as a `AtomicCell[F, A]`. This is useful not only
* for ergonomic reasons, but because some implementations may save space.
*
* Setting the `default` value is the same as storing a `None` in the underlying `AtomicCell`.
*/
def defaultedAtomicCell[F[_], A](
Comment on lines +242 to +248
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired by MapRef.defaultedRef

atomicCell: AtomicCell[F, Option[A]],
default: A
)(
implicit F: Applicative[F]
): AtomicCell[F, A] =
new DefaultedAtomicCell[F, A](atomicCell, default)

override def evalGetAndUpdate(f: A => F[A]): F[A] =
evalModify(a => f(a).map(aa => (aa, a)))
private[effect] final class DefaultedAtomicCell[F[_], A](
atomicCell: AtomicCell[F, Option[A]],
default: A
)(
implicit F: Applicative[F]
) extends CommonImpl[F, A] {
override def get: F[A] =
atomicCell.get.map(_.getOrElse(default))

override def evalUpdateAndGet(f: A => F[A]): F[A] =
evalModify(a => f(a).map(aa => (aa, aa)))
override def set(a: A): F[Unit] =
if (a == default) atomicCell.set(None) else atomicCell.set(Some(a))

override def evalModify[B](f: A => F[(A, B)]): F[B] =
atomicCell.evalModify { opt =>
val a = opt.getOrElse(default)
f(a).map {
case (result, b) =>
if (result == default) (None, b) else (Some(result), b)
}
}
}

implicit def atomicCellOptionSyntax[F[_], A](
atomicCell: AtomicCell[F, Option[A]]
)(
implicit F: Applicative[F]
): AtomicCellOptionOps[F, A] =
new AtomicCellOptionOps(atomicCell)

final class AtomicCellOptionOps[F[_], A] private[effect] (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how useful these extensions are, but it felt good to have them for completeness of the API.

atomicCell: AtomicCell[F, Option[A]]
)(
implicit F: Applicative[F]
) {
def getOrElse(default: A): F[A] =
atomicCell.get.map(_.getOrElse(default))

def unset: F[Unit] =
atomicCell.set(None)

def setValue(a: A): F[Unit] =
atomicCell.set(Some(a))

def modifyValueIfSet[B](f: A => (A, B)): F[Option[B]] =
evalModifyValueIfSet(a => F.pure(f(a)))

def evalModifyValueIfSet[B](f: A => F[(A, B)]): F[Option[B]] =
atomicCell.evalModify {
case None =>
F.pure((None, None))

case Some(a) =>
f(a).map {
case (result, b) =>
(Some(result), Some(b))
}
}

def updateValueIfSet(f: A => A): F[Unit] =
evalUpdateValueIfSet(a => F.pure(f(a)))

def evalUpdateValueIfSet(f: A => F[A]): F[Unit] =
atomicCell.evalUpdate {
case None => F.pure(None)
case Some(a) => f(a).map(Some.apply)
}

def getAndSetValue(a: A): F[Option[A]] =
atomicCell.getAndSet(Some(a))

def withDefaultValue(default: A): AtomicCell[F, A] =
defaultedAtomicCell(atomicCell, default)
}
}
Loading
Loading