Skip to content

Commit b30b35a

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
#2309 Instantiate MapDB blocking
Signed-off-by: Merlin Rabens <merlin.rabens@intenthq.com>
1 parent 2ccea6a commit b30b35a

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

src/main/scala/com/intenthq/action_processor/integrations/aggregations/Aggregate.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.intenthq.action_processor.integrations.aggregations
22

33
import java.util.concurrent.ConcurrentMap
44

5-
import cats.effect.{IO, Resource}
5+
import cats.effect.{Blocker, ContextShift, IO, Resource, SyncIO}
66
import com.intenthq.action_processor.integrations.config.MapDbSettings
77
import com.intenthq.action_processor.integrations.feeds.FeedContext
88
import com.intenthq.action_processor.integrations.repositories.MapDBRepository
@@ -14,9 +14,12 @@ import scala.jdk.CollectionConverters._
1414

1515
object Aggregate {
1616

17+
private lazy val blocker = Blocker[SyncIO].allocated.unsafeRunSync()._1
18+
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
19+
1720
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
1821

19-
private def loadAggRepository[K](mapDbSettings: MapDbSettings): Resource[IO, HTreeMap[K, Long]] = {
22+
private def loadAggRepository[K](mapDbSettings: MapDbSettings)(blocker: Blocker): Resource[IO, HTreeMap[K, Long]] = {
2023

2124
val serializer: Serializer[K] = new GroupSerializerObjectArray[K] {
2225
val elsaSerializer: ElsaSerializer = new ElsaMaker().make
@@ -25,7 +28,7 @@ object Aggregate {
2528
}
2629

2730
MapDBRepository
28-
.load(mapDbSettings)
31+
.load(mapDbSettings)(blocker)
2932
.map(db =>
3033
db.hashMap("stuff", serializer, Serializer.LONG.asInstanceOf[Serializer[Long]])
3134
.layout(mapDbSettings.segments, mapDbSettings.nodeSize, mapDbSettings.levels)
@@ -52,7 +55,7 @@ object Aggregate {
5255
)
5356
.map(e => (e.getKey, e.getValue))
5457

55-
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings)).flatMap { aggRepository =>
58+
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings)(blocker)).flatMap { aggRepository =>
5659
sourceStream.evalMap { i =>
5760
put(aggRepository, i)
5861
}.drain ++ streamKeyValue(aggRepository)
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package com.intenthq.action_processor.integrations.repositories
22

33
import java.io.File
4-
import java.nio.file.Paths
4+
import java.nio.file.{Files, Paths}
55
import java.time.LocalDateTime
66

7-
import cats.effect.{IO, Resource}
7+
import cats.effect.{Blocker, ContextShift, IO, Resource}
88
import com.intenthq.action_processor.integrations.config.MapDbSettings
99
import org.mapdb.{DB, DBMaker}
1010

1111
object MapDBRepository {
12-
def load(mapDBSettings: MapDbSettings): Resource[IO, DB] = {
13-
val dbCreated = for {
12+
13+
def load(mapDBSettings: MapDbSettings)(blocker: Blocker)(implicit cs: ContextShift[IO]): Resource[IO, DB] = {
14+
val dbInitOp = for {
1415
now <- IO.delay(LocalDateTime.now())
1516
dbFile <- IO.delay(new File(Paths.get(mapDBSettings.dbPath.toString, s"dbb-${now.toLocalDate}-${now.toLocalTime}").toUri))
16-
_ <- IO.delay(dbFile.deleteOnExit())
1717
createDb <- IO.delay {
1818
DBMaker
1919
.fileDB(dbFile.getAbsolutePath)
@@ -25,7 +25,7 @@ object MapDBRepository {
2525
.fileDeleteAfterClose()
2626
.make()
2727
}
28-
} yield createDb
29-
Resource.make(dbCreated)(db => IO.delay(db.close()))
28+
} yield (createDb, dbFile)
29+
Resource.make(blocker.blockOn(dbInitOp))(db => IO.delay(db._1.close()).guarantee(IO.delay(Files.deleteIfExists(db._2.toPath)).void)).map(_._1)
3030
}
3131
}

0 commit comments

Comments
 (0)