Skip to content

Commit 7d003a5

Browse files
fernandomoramerlinrabens
authored andcommitted
Cleans tests
1 parent 44aeecb commit 7d003a5

File tree

16 files changed

+245
-206
lines changed

16 files changed

+245
-206
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ ThisBuild / licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT")))
88

99
ThisBuild / scalaVersion := "2.13.3"
1010

11+
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.11.0" cross CrossVersion.full)
12+
1113
lazy val root = (project in file("."))
1214
.settings(
1315
name := "action-processor-integrations",

src/main/scala/com/intenthq/action_processor/integrationsV2/Feed.scala

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/main/scala/com/intenthq/action_processor/integrationsV2/Aggregate.scala renamed to src/main/scala/com/intenthq/action_processor/integrationsV2/aggregations/Aggregate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
package com.intenthq.action_processor.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2.aggregations
22

33
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
44

55
import cats.effect.{IO, Resource}
66

7-
import scala.jdk.CollectionConverters.IteratorHasAsScala
7+
import scala.jdk.CollectionConverters._
88

99
object Aggregate {
1010

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.intenthq.action_processor.integrationsV2.aggregations
2+
3+
import cats.effect.IO
4+
import com.intenthq.action_processor.integrationsV2.feeds.Feed
5+
6+
trait NoAggregate[I] {
7+
self: Feed[I, I] =>
8+
override def transform: fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
9+
}

src/main/scala/com/intenthq/action_processor/integrationsV2/CsvFeed.scala renamed to src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/CSVFeed.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
package com.intenthq.action_processor.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import java.io.StringReader
44

55
import cats.effect.{IO, Resource}
66
import cats.implicits.catsSyntaxApplicativeId
7+
import com.intenthq.action_processor.integrations.SourceContext
78
import de.siegmar.fastcsv.reader.CsvReader
89
import fs2.Stream
910

1011
import scala.jdk.CollectionConverters._
1112

12-
abstract class CsvFeed[O] extends Feed[Iterable[String], O] {
13+
trait CSVFeed[O] extends Feed[Iterable[String], O] {
1314

1415
protected lazy val csvReader: CsvReader = new CsvReader
1516

@@ -23,7 +24,7 @@ abstract class CsvFeed[O] extends Feed[Iterable[String], O] {
2324
.pure[IO]
2425
}
2526

26-
override def inputStream: Stream[IO, Iterable[String]] =
27+
override def inputStream(sourceContext: SourceContext[IO]): Stream[IO, Iterable[String]] =
2728
rows.evalMap(csvParse)
2829

2930
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.intenthq.action_processor.integrationsV2.feeds
2+
3+
import cats.effect.IO
4+
import com.intenthq.action_processor.integrations.SourceContext
5+
6+
trait Feed[I, O] {
7+
8+
def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, I]
9+
def transform: fs2.Pipe[IO, I, (O, Long)]
10+
def serialize(o: O, counter: Long): Array[Byte]
11+
12+
final def stream(sourceContext: SourceContext[IO]): fs2.Stream[IO, Array[Byte]] =
13+
inputStream(sourceContext)
14+
.through(transform)
15+
.map((serialize _).tupled)
16+
}

src/main/scala/com/intenthq/action_processor/integrationsV2/HiveFeed.scala renamed to src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/HiveFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.intenthq.action_processor.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import cats.effect.IO
44
import com.intenthq.action_processor.integrations.{JavaLegacyTimeMeta, TimeMeta}

src/main/scala/com/intenthq/action_processor/integrationsV2/LocalFileCsvFeed.scala renamed to src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/LocalFileCSVFeed.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.intenthq.action_processor.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import java.nio.file.Paths
44

@@ -7,7 +7,7 @@ import fs2.text
77

88
import scala.util.Properties
99

10-
trait LocalFileCsvFeed[O] extends CsvFeed[O] {
10+
trait LocalFileCSVFeed[O] extends CSVFeed[O] {
1111

1212
implicit protected val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
1313
protected val localFilePath: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
package com.intenthq.action_processor.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import cats.effect.{ContextShift, IO}
4-
import doobie.implicits.toDoobieStreamOps
4+
import com.intenthq.action_processor.integrations.{DoobieImplicits, SourceContext}
55
import doobie.util.query.Query0
66
import doobie.util.transactor.Transactor
77
import doobie.util.transactor.Transactor.Aux
88

9-
abstract class SQLFeed[I, O](driver: String) extends Feed[I, O] {
9+
abstract class SQLFeed[I, O](driver: String) extends Feed[I, O] with DoobieImplicits {
1010

1111
protected val jdbcUrl: String
12-
protected def query: Query0[I]
12+
protected def query(sourceContext: SourceContext[IO]): Query0[I]
1313

1414
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
1515

@@ -18,8 +18,8 @@ abstract class SQLFeed[I, O](driver: String) extends Feed[I, O] {
1818

1919
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
2020

21-
override def inputStream: fs2.Stream[IO, I] =
22-
query
21+
override def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, I] =
22+
query(sourceContext)
2323
.streamWithChunkSize(chunkSize)
2424
.transact[IO](transactor)
2525
}

src/test/resources/persons.csv

Lines changed: 0 additions & 4 deletions
This file was deleted.

0 commit comments

Comments
 (0)