Skip to content

Commit 9cdd5ac

Browse files
committed
IC-445 Refactor action processor feeds
1 parent 379bd30 commit 9cdd5ac

File tree

13 files changed

+134
-75
lines changed

13 files changed

+134
-75
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ lazy val root = (project in file("."))
2727
"org.mapdb" % "mapdb" % "3.0.8",
2828
"org.tpolecat" %% "doobie-core" % "1.0.0-RC1",
2929
"org.tpolecat" %% "doobie-hikari" % "1.0.0-RC1",
30+
"org.tpolecat" %% "doobie-postgres" % "1.0.0-RC1",
3031
"com.disneystreaming" %% "weaver-cats" % "0.7.7" % Test,
3132
"com.disneystreaming" %% "weaver-core" % "0.7.7" % Test,
3233
"org.tpolecat" %% "doobie-h2" % "1.0.0-RC1" % Test

src/main/scala/com/intenthq/action_processor/integrations/DoobieImplicits.scala

Lines changed: 0 additions & 39 deletions
This file was deleted.

src/main/scala/com/intenthq/action_processor/integrations/feeds/CSVFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import fs2.Stream
99

1010
import scala.jdk.CollectionConverters._
1111

12-
trait CSVFeed[O] extends Feed[Iterable[String], O] {
12+
trait ParseCSVInput[O] extends Feed[Iterable[String], O] {
1313

1414
protected lazy val csvReader: CsvReader = new CsvReader
1515

src/main/scala/com/intenthq/action_processor/integrations/feeds/HiveFeed.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.intenthq.action_processor.integrations.feeds
22

33
import cats.effect.IO
4-
import com.intenthq.action_processor.integrations.{JavaLegacyTimeMeta, TimeMeta}
4+
import com.intenthq.action_processor.integrations.implicits.DoobieImplicits
55
import doobie.util.transactor.{Strategy, Transactor}
66

77
import scala.util.Properties
88

9-
trait HiveFeed[I, O] extends SQLFeed[I, O] with TimeMeta with JavaLegacyTimeMeta {
9+
trait HiveFeed[I, O] extends SQLFeed[I, O] with DoobieImplicits.javatime.legacy {
1010
override protected val driver: String = "org.apache.hive.jdbc.HiveDriver"
1111
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
1212
override protected val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")

src/main/scala/com/intenthq/action_processor/integrations/feeds/LocalFileCSVFeed.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.intenthq.action_processor.integrations.feeds
2+
3+
import cats.effect.IO
4+
import fs2.Pipe
5+
import fs2.io.file.{Files, Flags, Path}
6+
7+
import java.nio.file.Paths
8+
import scala.annotation.unused
9+
import scala.util.Properties
10+
11+
trait LocalFileFeed[I, O] extends Feed[I, O] {
12+
13+
protected val localFilePath: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
14+
protected val parseInput: Pipe[IO, Byte, I]
15+
16+
override def inputStream(@unused feedContext: FeedContext[IO]): fs2.Stream[IO, I] =
17+
Files[IO]
18+
.readAll(Path.fromNioPath(Paths.get(localFilePath)), 64 * 1024, Flags.Read)
19+
.through(parseInput)
20+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.intenthq.action_processor.integrations.feeds
2+
3+
import com.intenthq.action_processor.integrations.implicits.PgDoobieImplicits
4+
5+
import scala.util.Properties
6+
7+
trait PostgresFeed[I, O] extends SQLFeed[I, O] with PgDoobieImplicits {
8+
override val driver: String = "org.postgresql.Driver"
9+
override protected val jdbcUrl: String = Properties.envOrElse("POSTGRESQL_JDBC_URL", "jdbc:postgresql://localhost:5432/database")
10+
}

src/main/scala/com/intenthq/action_processor/integrations/feeds/SQLFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.intenthq.action_processor.integrations.feeds
22

33
import cats.effect.IO
4-
import com.intenthq.action_processor.integrations.DoobieImplicits
4+
import com.intenthq.action_processor.integrations.implicits.DoobieImplicits
55
import doobie.util.query.Query0
66
import doobie.util.transactor.Transactor
77
import doobie.util.transactor.Transactor.Aux
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.intenthq.action_processor.integrations.implicits
2+
3+
import doobie.Meta
4+
5+
trait DoobieImplicits
6+
// https://github.com/tpolecat/doobie/blob/series/0.13.x/modules/core/src/main/scala/doobie/package.scala#L21
7+
extends doobie.free.Instances
8+
with doobie.syntax.AllSyntax
9+
with doobie.util.meta.MetaConstructors
10+
with doobie.util.meta.SqlMetaInstances
11+
with CatsImplicits {
12+
val fragments: doobie.util.fragments.type = doobie.util.fragments
13+
}
14+
15+
object DoobieImplicits {
16+
object javatime {
17+
trait legacy
18+
extends doobie.util.meta.LegacyInstantMetaInstance
19+
with doobie.util.meta.LegacyLocalDateMetaInstance
20+
with LegacyLocalTimeMetaInstance
21+
with LegacyLocalDateTimeMetaInstance
22+
23+
trait drivernative extends doobie.util.meta.MetaConstructors with doobie.util.meta.TimeMetaInstances
24+
}
25+
}
26+
27+
trait LegacyLocalTimeMetaInstance {
28+
implicit val JavaLocalTimeMeta: Meta[java.time.LocalTime] =
29+
doobie.implicits.javasql.TimeMeta.imap(_.toLocalTime)(java.sql.Time.valueOf)
30+
}
31+
32+
trait LegacyLocalDateTimeMetaInstance {
33+
implicit val JavaTimeLocalDateTimeMeta: Meta[java.time.LocalDateTime] =
34+
doobie.implicits.javasql.TimestampMeta.imap(_.toLocalDateTime)(java.sql.Timestamp.valueOf)
35+
}
36+
trait CatsImplicits
37+
// https://github.com/typelevel/cats/blob/main/core/src/main/scala/cats/implicits.scala#L3
38+
extends cats.syntax.AllSyntax
39+
with cats.instances.AllInstances
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.intenthq.action_processor.integrations.implicits
2+
3+
trait PgDoobieImplicits
4+
// https://github.com/tpolecat/doobie/blob/series/0.13.x/modules/postgres/src/main/scala/doobie/postgres/package.scala
5+
extends doobie.postgres.Instances
6+
with doobie.postgres.free.Instances
7+
with doobie.postgres.JavaTimeInstances
8+
with doobie.postgres.syntax.ToPostgresMonadErrorOps
9+
with doobie.postgres.syntax.ToFragmentOps
10+
with doobie.postgres.syntax.ToPostgresExplainOps

0 commit comments

Comments
 (0)