Skip to content

Commit 379bd30

Browse files
fernandomoraDarren8098
authored andcommitted
IC-481 Migrates to cats-effect 3
1 parent 8e0113c commit 379bd30

File tree

12 files changed

+61
-124
lines changed

12 files changed

+61
-124
lines changed

.gitignore

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,4 @@
1-
# javascript specific
2-
node_modules
3-
4-
# scala specific
5-
*.class
6-
*.log
7-
8-
# sbt specific
9-
.cache
10-
.history
11-
.lib/
12-
dist/*
13-
target/
14-
lib_managed/
15-
src_managed/
16-
project/boot/
17-
project/plugins/project/
18-
19-
# intellij
1+
.bsp/
202
.idea/
21-
*.ipr
22-
*.iml
23-
24-
# Scala IDE specific
25-
.scala_dependencies
26-
.worksheet
27-
28-
# Visual Studio Code specific
29-
.bloop/
30-
.metals/
31-
project/metals.sbt
32-
.history/
33-
.vscode/
34-
35-
# environment specific
36-
.envrc
37-
38-
# Mac files
39-
.DS_Store
40-
41-
42-
# deployment secrets
43-
devel.secrets.json
44-
/dashboard/certs/
45-
/docker/dashboard/dependencies/certs
46-
47-
build.sbt.repl
48-
build.sbt.original
3+
project/target/
4+
target/

build.sbt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ ThisBuild / homepage := Some(url("https://github.com/intenthq/action-processor-i
66
ThisBuild / developers := List(Developer("intenthq", "Intent HQ", null, url("https://www.intenthq.com/")))
77
ThisBuild / licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT")))
88

9-
ThisBuild / scalaVersion := "2.13.3"
9+
ThisBuild / scalaVersion := "2.13.7"
10+
ThisBuild / semanticdbEnabled := true
11+
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
1012

11-
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.11.0" cross CrossVersion.full)
13+
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full)
1214

1315
lazy val root = (project in file("."))
1416
.settings(
@@ -18,17 +20,16 @@ lazy val root = (project in file("."))
1820
testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
1921
scalafmtOnCompile := true,
2022
libraryDependencies ++= Seq(
21-
"co.fs2" %% "fs2-core" % "2.5.5",
22-
"co.fs2" %% "fs2-io" % "2.5.5",
23-
"com.google.guava" % "guava" % "30.0-jre",
24-
"com.propensive" %% "magnolia" % "0.17.0",
23+
"co.fs2" %% "fs2-core" % "3.2.2",
24+
"co.fs2" %% "fs2-io" % "3.2.2",
25+
"com.softwaremill.magnolia1_2" %% "magnolia" % "1.0.0-M7",
2526
"de.siegmar" % "fastcsv" % "1.0.3",
2627
"org.mapdb" % "mapdb" % "3.0.8",
27-
"org.tpolecat" %% "doobie-core" % "0.12.1",
28-
"org.tpolecat" %% "doobie-hikari" % "0.12.1",
29-
"com.disneystreaming" %% "weaver-cats" % "0.6.2" % Test,
30-
"com.disneystreaming" %% "weaver-core" % "0.6.2" % Test,
31-
"org.tpolecat" %% "doobie-h2" % "0.12.1" % Test
28+
"org.tpolecat" %% "doobie-core" % "1.0.0-RC1",
29+
"org.tpolecat" %% "doobie-hikari" % "1.0.0-RC1",
30+
"com.disneystreaming" %% "weaver-cats" % "0.7.7" % Test,
31+
"com.disneystreaming" %% "weaver-core" % "0.7.7" % Test,
32+
"org.tpolecat" %% "doobie-h2" % "1.0.0-RC1" % Test
3233
),
3334
/*
3435
https://github.com/sbt/sbt/issues/3249#issuecomment-534757714

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.3.13
1+
sbt.version=1.5.5

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3")
22
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
33
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.13")
44
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
5+
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.32")

src/main/scala/com/intenthq/StreamOps.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package com.intenthq
22

3-
import cats.effect.{Concurrent, Sync, Timer}
3+
import cats.effect.{Async, Concurrent, Sync, Temporal}
44
import cats.implicits._
55
import fs2.{Chunk, Stream}
66

77
import scala.concurrent.duration.FiniteDuration
88
import StreamOps._
9-
import cats.Parallel
109

1110
class StreamChunkOps[F[_], O](s: fs2.Stream[F, Chunk[O]]) {
1211
private val parallelismEnabled: Boolean = scala.util.Properties.envOrNone("FS2_PARALLELISM").map(_.toBoolean).getOrElse(true)
@@ -25,37 +24,37 @@ class StreamChunkOps[F[_], O](s: fs2.Stream[F, Chunk[O]]) {
2524
s.parEvalMap[F2, Stream[F2, O2]](parallelism)(chunk => f(chunk).map(fs2.Stream.chunk)).flatten
2625
}
2726

28-
class StreamOps[F[_]: Sync, O](s: fs2.Stream[F, O]) {
27+
class StreamOps[F[_], O](s: fs2.Stream[F, O]) {
2928

3029
def debugEvery(
3130
d: FiniteDuration
32-
)(formatter: O => String = _.toString, logger: String => Unit = println(_))(implicit C: Concurrent[F], T: Timer[F]): Stream[F, O] =
31+
)(formatter: O => String = _.toString, logger: String => Unit = println(_))(implicit C: Temporal[F]): Stream[F, O] =
3332
s.observe(_.debounce(d).debug(formatter, logger).drain)
3433

3534
object preservingChunks {
3635
// Parallelizes different chunks processing but not elements of same chunk (traverse instead of parTraverse)
37-
def parEvalMap[F2[x] >: F[x]: Concurrent: Parallel, O2](parallelism: Int)(f: O => F2[O2]): Stream[F2, O2] =
36+
def parEvalMap[F2[x] >: F[x]: Concurrent, O2](parallelism: Int)(f: O => F2[O2]): Stream[F2, O2] =
3837
s.chunks.parEvalMapChunks[F2, O2](parallelism)(chunk => chunk.traverse(f))
3938

4039
// Alias for parallezing non-effectful operations
41-
def parMap[O2](parallelism: Int)(f: O => O2)(implicit C: Concurrent[F], P: Parallel[F]): Stream[F, O2] =
40+
def parMap[O2](parallelism: Int)(f: O => O2)(implicit A: Async[F]): Stream[F, O2] =
4241
preservingChunks.parEvalMap(parallelism)(o => Sync[F].delay(f(o)))
4342

4443
// Parallelizes different chunks processing but not elements of same chunk (traverse instead of parTraverse)
45-
def parEvalMapUnordered[F2[x] >: F[x]: Concurrent: Parallel, O2](parallelism: Int)(f: O => F2[O2]): Stream[F2, O2] =
44+
def parEvalMapUnordered[F2[x] >: F[x]: Concurrent, O2](parallelism: Int)(f: O => F2[O2]): Stream[F2, O2] =
4645
s.chunks.parEvalMapChunksUnordered[F2, O2](parallelism)(chunk => chunk.traverse(f))
4746

4847
// Alias for parallezing non-effectful operations
49-
def parMapUnordered[O2](parallelism: Int)(f: O => O2)(implicit C: Concurrent[F], P: Parallel[F]): Stream[F, O2] =
48+
def parMapUnordered[O2](parallelism: Int)(f: O => O2)(implicit A: Async[F]): Stream[F, O2] =
5049
preservingChunks.parEvalMapUnordered(parallelism)(o => Sync[F].delay(f(o)))
5150
}
5251
}
5352

5453
trait ToStreamOps {
55-
implicit def toStreamOps[F[_]: Sync, O](fo: Stream[F, O]): StreamOps[F, O] =
54+
implicit def toStreamOps[F[_], O](fo: Stream[F, O]): StreamOps[F, O] =
5655
new StreamOps(fo)
5756

58-
implicit def toStreamChunkOps[F[_]: Sync, O](fo: fs2.Stream[F, Chunk[O]]): StreamChunkOps[F, O] =
57+
implicit def toStreamChunkOps[F[_], O](fo: fs2.Stream[F, Chunk[O]]): StreamChunkOps[F, O] =
5958
new StreamChunkOps(fo)
6059
}
6160

src/main/scala/com/intenthq/action_processor/integrations/aggregations/Aggregate.scala

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
package com.intenthq.action_processor.integrations.aggregations
22

3-
import java.util.concurrent.ConcurrentMap
4-
5-
import scala.jdk.CollectionConverters._
6-
7-
import cats.effect.{Blocker, ContextShift, IO, Resource, SyncIO}
8-
3+
import cats.effect.{IO, Resource}
94
import com.intenthq.action_processor.integrations.config.MapDbSettings
105
import com.intenthq.action_processor.integrations.feeds.FeedContext
116
import com.intenthq.action_processor.integrations.repositories.MapDBRepository
12-
13-
import org.mapdb.{DataInput2, DataOutput2, HTreeMap, Serializer}
147
import org.mapdb.elsa.{ElsaMaker, ElsaSerializer}
158
import org.mapdb.serializer.GroupSerializerObjectArray
9+
import org.mapdb.{DataInput2, DataOutput2, HTreeMap, Serializer}
1610

17-
object Aggregate {
11+
import java.util.concurrent.ConcurrentMap
12+
import scala.jdk.CollectionConverters._
1813

19-
private lazy val blocker = Blocker[SyncIO].allocated.unsafeRunSync()._1
20-
private val ec = scala.concurrent.ExecutionContext.global
21-
implicit private val contextShift: ContextShift[IO] = IO.contextShift(ec)
14+
object Aggregate {
2215

2316
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
2417

25-
private def loadAggRepository[K](mapDbSettings: MapDbSettings)(blocker: Blocker): Resource[IO, HTreeMap[K, Long]] = {
18+
private def loadAggRepository[K](mapDbSettings: MapDbSettings): Resource[IO, HTreeMap[K, Long]] = {
2619

2720
val serializer: Serializer[K] = new GroupSerializerObjectArray[K] {
2821
val elsaSerializer: ElsaSerializer = new ElsaMaker().make
@@ -31,7 +24,7 @@ object Aggregate {
3124
}
3225

3326
MapDBRepository
34-
.load(mapDbSettings)(blocker)
27+
.load(mapDbSettings)
3528
.map(db =>
3629
db.hashMap("stuff", serializer, Serializer.LONG.asInstanceOf[Serializer[Long]])
3730
.layout(mapDbSettings.segments, mapDbSettings.nodeSize, mapDbSettings.levels)
@@ -46,9 +39,9 @@ object Aggregate {
4639
val aggregateInRepository: fs2.Pipe[IO, I, ConcurrentMap[K, Long]] =
4740
in => {
4841
fs2.Stream
49-
.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings)(blocker))
42+
.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings))
5043
.flatMap { aggRepository =>
51-
fs2.Stream.eval_(IO.delay(println("Starting aggregation"))) ++
44+
fs2.Stream.exec(IO.delay(println("Starting aggregation"))) ++
5245
in.evalMapChunk { o =>
5346
IO.delay {
5447
keys(o).foreach { value =>
Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
package com.intenthq.action_processor.integrations.feeds
22

3-
import java.nio.file.Paths
4-
5-
import cats.effect.{Blocker, ContextShift, IO}
3+
import cats.effect.IO
4+
import fs2.io.file.{Files, Flags, Path}
65
import fs2.text
76

7+
import java.nio.file.Paths
88
import scala.util.Properties
99

1010
trait LocalFileCSVFeed[O] extends CSVFeed[O] {
1111

12-
implicit protected val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
1312
protected val localFilePath: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
1413

1514
override protected def rows: fs2.Stream[IO, String] =
16-
fs2.Stream.resource(Blocker[IO]).flatMap { blocker =>
17-
fs2.io.file
18-
.readAll[IO](Paths.get(localFilePath), blocker, 4096)
19-
.through(text.utf8Decode)
20-
.through(text.lines)
21-
.dropLastIf(_.isEmpty)
22-
}
15+
Files[IO]
16+
.readAll(Path.fromNioPath(Paths.get(localFilePath)), 4096, Flags.Read)
17+
.through(text.utf8.decode)
18+
.through(text.lines)
19+
.dropLastIf(_.isEmpty)
2320
}

src/main/scala/com/intenthq/action_processor/integrations/feeds/SQLFeed.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.intenthq.action_processor.integrations.feeds
22

3-
import cats.effect.{ContextShift, IO}
3+
import cats.effect.IO
44
import com.intenthq.action_processor.integrations.DoobieImplicits
55
import doobie.util.query.Query0
66
import doobie.util.transactor.Transactor
@@ -12,8 +12,6 @@ trait SQLFeed[I, O] extends Feed[I, O] with DoobieImplicits {
1212
protected val jdbcUrl: String
1313
protected def query(sourceContext: FeedContext[IO]): Query0[I]
1414

15-
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
16-
1715
protected lazy val transactor: Transactor[IO] = createTransactor
1816
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
1917

src/main/scala/com/intenthq/action_processor/integrations/repositories/MapDBRepository.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ import java.io.File
44
import java.nio.file.{Files, Paths}
55
import java.time.LocalDateTime
66

7-
import cats.effect.{Blocker, ContextShift, IO, Resource}
7+
import cats.effect.{IO, Resource}
88
import com.intenthq.action_processor.integrations.config.MapDbSettings
99
import org.mapdb.{DB, DBMaker}
1010

1111
object MapDBRepository {
1212

13-
def load(mapDBSettings: MapDbSettings)(blocker: Blocker)(implicit cs: ContextShift[IO]): Resource[IO, DB] = {
13+
def load(mapDBSettings: MapDbSettings): Resource[IO, DB] = {
1414
val dbInitOp = for {
1515
now <- IO.delay(LocalDateTime.now())
1616
dbFile <- IO.delay(new File(Paths.get(mapDBSettings.dbPath.toString, s"dbb-${now.toLocalDate}-${now.toLocalTime}").toUri))
17-
createDb <- IO.delay {
17+
createDb <- IO.blocking {
1818
DBMaker
1919
.fileDB(dbFile.getAbsolutePath)
2020
.allocateStartSize(mapDBSettings.startDbSize)
@@ -26,6 +26,6 @@ object MapDBRepository {
2626
.make()
2727
}
2828
} yield (createDb, dbFile)
29-
Resource.make(blocker.blockOn(dbInitOp))(db => IO.delay(db._1.close()).guarantee(IO.delay(Files.deleteIfExists(db._2.toPath)).void)).map(_._1)
29+
Resource.make(dbInitOp)(db => IO.delay(db._1.close()).guarantee(IO.delay(Files.deleteIfExists(db._2.toPath)).void)).map(_._1)
3030
}
3131
}

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/Csv.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.intenthq.action_processor.integrations.serializations.csv
22

33
import java.time._
44

5-
import magnolia._
5+
import magnolia1._
66

77
trait Csv[A] {
88
def toCSV(a: A): Seq[String]
@@ -13,9 +13,9 @@ object Csv {
1313

1414
type Typeclass[A] = Csv[A]
1515

16-
def combine[A](ctx: CaseClass[Csv, A]): Csv[A] = (a: A) => ctx.parameters.flatMap(p => p.typeclass.toCSV(p.dereference(a)))
16+
def join[A](ctx: CaseClass[Csv, A]): Csv[A] = (a: A) => ctx.parameters.flatMap(p => p.typeclass.toCSV(p.dereference(a)))
1717

18-
def dispatch[A](ctx: SealedTrait[Csv, A]): Csv[A] = (a: A) => ctx.dispatch(a)(sub => sub.typeclass.toCSV(sub.cast(a)))
18+
def split[A](ctx: SealedTrait[Csv, A]): Csv[A] = (a: A) => ctx.split(a)(sub => sub.typeclass.toCSV(sub.cast(a)))
1919

2020
implicit def csvOpt[T: Csv]: Csv[Option[T]] = (a: Option[T]) => a.fold(Seq(""))(Csv[T].toCSV)
2121
implicit def csvIterable[T: Csv]: Csv[Iterable[T]] = (a: Iterable[T]) => Seq(a.map(Csv[T].toCSV).mkString(","))

0 commit comments

Comments
 (0)