Skip to content

Commit 9943d3c

Browse files
fernandomoramerlinrabens
authored andcommitted
Split tests
1 parent c731713 commit 9943d3c

File tree

5 files changed

+76
-39
lines changed

5 files changed

+76
-39
lines changed

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/Csv.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,31 @@ import java.time._
55
import magnolia._
66

77
trait Csv[A] {
8-
def toCSV(a: A): Array[String]
8+
def toCSV(a: A): Seq[String]
99
}
1010

1111
object Csv {
1212
def apply[A](implicit csv: Csv[A]): Typeclass[A] = csv
1313

1414
type Typeclass[A] = Csv[A]
1515

16-
def combine[A](ctx: CaseClass[Csv, A]): Csv[A] =
17-
(a: A) => ctx.parameters.foldLeft(Array.empty[String])((acc, p) => acc ++ p.typeclass.toCSV(p.dereference(a)))
16+
def combine[A](ctx: CaseClass[Csv, A]): Csv[A] = (a: A) => ctx.parameters.flatMap(p => p.typeclass.toCSV(p.dereference(a)))
1817

1918
def dispatch[A](ctx: SealedTrait[Csv, A]): Csv[A] = (a: A) => ctx.dispatch(a)(sub => sub.typeclass.toCSV(sub.cast(a)))
2019

21-
implicit def csvOpt[T: Csv]: Csv[Option[T]] = (a: Option[T]) => a.fold(Array[String](""))(Csv[T].toCSV)
22-
implicit def csvIterable[T: Csv]: Csv[Iterable[T]] = (a: Iterable[T]) => a.foldLeft(Array[String](""))((acc, t) => acc ++ Csv[T].toCSV(t))
23-
implicit val csvStr: Csv[String] = (a: String) => Array(a)
24-
implicit val csvInt: Csv[Int] = (a: Int) => Array(a.toString)
25-
implicit val csvLong: Csv[Long] = (a: Long) => Array(a.toString)
26-
implicit val csvFloat: Csv[Float] = (a: Float) => Array(a.toString)
27-
implicit val csvDouble: Csv[Double] = (a: Double) => Array(a.toString)
28-
implicit val csvBigDecimal: Csv[BigDecimal] = (a: BigDecimal) => Array(a.toString)
29-
implicit val csvBoolean: Csv[Boolean] = (a: Boolean) => Array(a.toString)
30-
implicit val csvLocalDate: Csv[LocalDate] = (a: LocalDate) => Array(a.toString)
31-
implicit val csvLocalTime: Csv[LocalTime] = (a: LocalTime) => Array(a.toString)
32-
implicit val csvInstant: Csv[Instant] = (a: Instant) => Array(a.toString)
33-
implicit val csvLocalDateTime: Csv[LocalDateTime] = (a: LocalDateTime) => Array(a.toString)
20+
implicit def csvOpt[T: Csv]: Csv[Option[T]] = (a: Option[T]) => a.fold(Seq(""))(Csv[T].toCSV)
21+
implicit def csvIterable[T: Csv]: Csv[Iterable[T]] = (a: Iterable[T]) => Seq(a.map(Csv[T].toCSV).mkString(","))
22+
implicit val csvStr: Csv[String] = (a: String) => Seq(a)
23+
implicit val csvInt: Csv[Int] = (a: Int) => Seq(a.toString)
24+
implicit val csvLong: Csv[Long] = (a: Long) => Seq(a.toString)
25+
implicit val csvFloat: Csv[Float] = (a: Float) => Seq(a.toString)
26+
implicit val csvDouble: Csv[Double] = (a: Double) => Seq(a.toString)
27+
implicit val csvBigDecimal: Csv[BigDecimal] = (a: BigDecimal) => Seq(a.toString)
28+
implicit val csvBoolean: Csv[Boolean] = (a: Boolean) => Seq(a.toString)
29+
implicit val csvLocalDate: Csv[LocalDate] = (a: LocalDate) => Seq(a.toString)
30+
implicit val csvLocalTime: Csv[LocalTime] = (a: LocalTime) => Seq(a.toString)
31+
implicit val csvInstant: Csv[Instant] = (a: Instant) => Seq(a.toString)
32+
implicit val csvLocalDateTime: Csv[LocalDateTime] = (a: LocalDateTime) => Seq(a.toString)
3433

3534
implicit def deriveCsv[A]: Csv[A] = macro Magnolia.gen[A]
3635
}

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerialization.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ object CsvSerialization {
1414
writer
1515
}
1616

17-
private def encode[O](o: O)(implicit csv: Csv[O]): Array[String] = csv.toCSV(o)
17+
def encodeAsColumns[O](o: O)(implicit csv: Csv[O]): Seq[String] = csv.toCSV(o)
1818

19-
def serialize[O](o: O)(implicit csv: Csv[O]): Array[Byte] = {
19+
def columnsAsCsv(columns: Iterable[String]): Array[Byte] = {
2020
val sw = new StringWriter()
21-
val columns = encode(o)
2221
val appender = csvWriter.append(sw)
23-
appender.appendLine(columns: _*)
22+
appender.appendLine(columns.toSeq: _*)
2423
// Make sure we flush internal appender FastBufferedWriter
2524
appender.close()
2625
sw.toString.getBytes(StandardCharsets.UTF_8)
2726
}
27+
28+
def serialize[O](o: O)(implicit csv: Csv[O]): Array[Byte] = columnsAsCsv(encodeAsColumns(o))
2829
}

src/test/scala/com/intenthq/action_processor/integrations/CSVFeedSpec.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ import java.nio.file.Files
77
import cats.effect.{IO, Resource}
88
import cats.implicits._
99
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
10-
import com.intenthq.action_processor.integrationsV2.aggregations.Aggregate
10+
import com.intenthq.action_processor.integrationsV2.aggregations.NoAggregate
1111
import com.intenthq.action_processor.integrationsV2.feeds.LocalFileCSVFeed
12-
import fs2.Pipe
1312
import weaver.IOSuite
1413

15-
object CSVFeedSpec extends IOSuite with CsvFeedSpecResources {
14+
object CSVFeedSpec extends IOSuite with CSVFeedSpecResources {
1615

1716
override val csvFeedContent: String =
1817
""""Peter"|"Big Street 1"|"5"
@@ -23,9 +22,10 @@ object CSVFeedSpec extends IOSuite with CsvFeedSpecResources {
2322

2423
test("should return a stream of aggregated csv feed rows") { resources =>
2524
val expectedResult: Set[String] = Set(
26-
"Peter,Big Street 1,11",
25+
"Peter,Big Street 1,5",
2726
"Gabriela,Big Street 2,7",
28-
"Jolie,Big Street 3,4"
27+
"Jolie,Big Street 3,4",
28+
"Peter,Big Street 1,6"
2929
).map(_ + '\n')
3030

3131
for {
@@ -35,7 +35,8 @@ object CSVFeedSpec extends IOSuite with CsvFeedSpecResources {
3535
}
3636
}
3737

38-
trait CsvFeedSpecResources { self: IOSuite =>
38+
trait CSVFeedSpecResources { self: IOSuite =>
39+
3940
case class Resources(csvFeed: ExampleLocalFileCSVFeed)
4041
override type Res = Resources
4142

@@ -61,18 +62,9 @@ trait CsvFeedSpecResources { self: IOSuite =>
6162
}
6263
}
6364

64-
case class AggregatedPerson(name: String, address: String)
65-
66-
class ExampleLocalFileCSVFeed(override val localFilePath: String) extends LocalFileCSVFeed[AggregatedPerson] {
65+
class ExampleLocalFileCSVFeed(override val localFilePath: String) extends LocalFileCSVFeed[Iterable[String]] with NoAggregate[Iterable[String]] {
6766

6867
csvReader.setFieldSeparator('|')
6968

70-
private def key(columns: Iterable[String]) = {
71-
val v = columns.toVector
72-
AggregatedPerson(v(0), v(1))
73-
}
74-
75-
private def counter(columns: Iterable[String]) = columns.lastOption.flatMap(v => scala.util.Try(v.toLong).toOption).getOrElse(0L)
76-
override def transform: Pipe[IO, Iterable[String], (AggregatedPerson, Long)] = Aggregate.aggregateByKey[Iterable[String], AggregatedPerson](key, counter)
77-
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter))
69+
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.columnsAsCsv(o)
7870
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.intenthq.action_processor.integrations
2+
3+
import java.nio.charset.StandardCharsets
4+
5+
import cats.effect.IO
6+
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
7+
import com.intenthq.action_processor.integrationsV2.aggregations.Aggregate
8+
import com.intenthq.action_processor.integrationsV2.feeds.Feed
9+
import fs2.Pipe
10+
import weaver.SimpleIOSuite
11+
12+
object FeedAggregatedSpec extends SimpleIOSuite {
13+
14+
test("should return a stream of aggregated csv feed rows") {
15+
val aggregatedFeed = new PersonsAggregatedByScoreFeed(
16+
Person("Peter", "Big Street 1", 5),
17+
Person("Gabriela", "Big Street 2", 7),
18+
Person("Jolie", "Big Street 3", 4),
19+
Person("Peter", "Big Street 1", 6)
20+
)
21+
22+
val expectedResult: Set[String] = Set(
23+
"Peter,Big Street 1,11",
24+
"Gabriela,Big Street 2,7",
25+
"Jolie,Big Street 3,4"
26+
).map(_ + '\n')
27+
28+
for {
29+
feedStreamLinesBytes <- aggregatedFeed.stream(SourceContext.empty).compile.toList
30+
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
31+
} yield expect(feedStreamLines == expectedResult)
32+
}
33+
}
34+
35+
case class Person(name: String, address: String, score: Int)
36+
case class AggregatedPerson(name: String, address: String)
37+
38+
class PersonsAggregatedByScoreFeed(persons: Person*) extends Feed[Person, AggregatedPerson] {
39+
override def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, Person] = fs2.Stream(persons: _*).covary[IO]
40+
41+
override def transform: Pipe[IO, Person, (AggregatedPerson, Long)] =
42+
Aggregate.aggregateByKey[Person, AggregatedPerson](person => AggregatedPerson(person.name, person.address), _.score.toLong)
43+
44+
override def serialize(o: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((o, counter))
45+
}

src/test/scala/com/intenthq/action_processor/integrations/SQLFeedSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import weaver.IOSuite
1818

1919
import scala.concurrent.ExecutionContextExecutor
2020

21-
object SQLFeedSpec extends IOSuite with SqlCsvFeedSpecResource {
21+
object SQLFeedSpec extends IOSuite with SQLFeedSpecResources {
2222

2323
override val exampleRows: Seq[ExampleCsvFeedRow] = (1 to 100).map(n =>
2424
ExampleCsvFeedRow(
@@ -46,7 +46,7 @@ object SQLFeedSpec extends IOSuite with SqlCsvFeedSpecResource {
4646

4747
}
4848

49-
trait SqlCsvFeedSpecResource { self: IOSuite =>
49+
trait SQLFeedSpecResources { self: IOSuite =>
5050
override type Res = Transactor[IO]
5151

5252
protected val exampleRows: Seq[ExampleCsvFeedRow]

0 commit comments

Comments
 (0)