Skip to content

Commit ef48e73

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
Next evolutionary step
1 parent 1faaa62 commit ef48e73

File tree

5 files changed

+37
-34
lines changed

5 files changed

+37
-34
lines changed

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerialization.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,15 @@ import java.io._
44
import java.nio.charset.StandardCharsets
55

66
import cats.effect.IO
7-
import de.siegmar.fastcsv.writer.CsvWriter
7+
//import de.siegmar.fastcsv.writer.CsvWriter
8+
import com.opencsv.CSVWriter
89

910
import scala.collection.immutable.ArraySeq
1011

1112
object CsvSerialization {
1213

1314
val lineDelimiter: String = "\n"
14-
private val csvWriter = {
15-
val writer = new CsvWriter()
16-
writer.setLineDelimiter(lineDelimiter.toCharArray)
17-
writer
18-
}
15+
private val csvWriter = new CSVWriter(new StringWriter())
1916

2017
def serialize[O](o: O)(implicit csv: Csv[O]): IO[Array[Byte]] =
2118
IO.delay {

src/main/scala/com/intenthq/action_processor/integrationsV2/CsvFeed.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,32 @@ import sourcecode.Text.generate
1010

1111
import scala.jdk.CollectionConverters._
1212

13-
abstract class CsvFeed[I <: Product, O] extends Feed[I, O] {
13+
abstract class CsvFeed[O] extends Feed[Iterable[String], O] {
1414

15-
protected val csvResource: String
15+
// protected val csvResource: String
1616

17-
private lazy val typeFactory = new ReflectionHelpers.CaseClassFactory[I]
17+
// private lazy val typeFactory = new ReflectionHelpers.CaseClassFactory[I]
1818

1919
protected lazy val csvReader: CsvReader = new CsvReader
2020

21-
protected def rows: Stream[IO, I]
21+
protected def rows: Stream[IO, String]
2222

2323
private def csvParse(line: String): IO[Iterable[String]] =
2424
Resource.fromAutoCloseable(IO.delay(new StringReader(line))).use { sr =>
2525
Option(csvReader.parse(sr))
2626
.flatMap(parser => Option(parser.nextRow().getFields.asScala))
27-
.getOrElse(Iterable.empty[String])
27+
.getOrElse(collection.mutable.Buffer.empty[String])
2828
.pure[IO]
2929
}
3030

31-
override def inputStream: Stream[IO, I] = rows
31+
override def inputStream: Stream[IO, Iterable[String]] =
32+
rows.evalMap(csvParse)
3233

33-
protected def fromString: fs2.Pipe[IO, String, I] =
34-
sourceStream => {
35-
sourceStream.evalMap { line =>
36-
val params = csvParse(line).productIterator.toList
37-
IO(typeFactory.buildWith(params))
38-
}
39-
}
34+
// protected def fromString: fs2.Pipe[IO, Iterable[String], I] =
35+
// sourceStream => {
36+
// sourceStream.evalMap { line =>
37+
// val params = .productIterator.toList
38+
// IO(typeFactory.buildWith(params))
39+
// }
40+
// }
4041
}

src/main/scala/com/intenthq/action_processor/integrationsV2/Feed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ trait Feed[I, O] {
1111
final def stream: fs2.Stream[IO, Array[Byte]] =
1212
inputStream
1313
.through(transform)
14-
.map { case (a, counter) => serialize(a, counter) }
14+
.map((serialize _).tupled)
1515
}
1616

1717
trait NoAggregate[I] { self: Feed[I, I] =>

src/main/scala/com/intenthq/action_processor/integrationsV2/LocalFileCsvFeed.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@ import fs2.text
77

88
import scala.util.Properties
99

10-
abstract class LocalFileCsvFeed[I <: Product, O] extends CsvFeed[I, O] {
10+
trait LocalFileCsvFeed[O] extends CsvFeed[O] {
1111

1212
implicit protected val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
13-
override protected val csvResource: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
13+
protected val localFilePath: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
1414

15-
override protected def rows: fs2.Stream[IO, I] =
15+
override protected def rows: fs2.Stream[IO, String] =
1616
fs2.Stream.resource(Blocker[IO]).flatMap { blocker =>
1717
fs2.io.file
18-
.readAll[IO](Paths.get(csvResource), blocker, 4096)
18+
.readAll[IO](Paths.get(localFilePath), blocker, 4096)
1919
.through(text.utf8Decode)
2020
.through(text.lines)
21-
.through(fromString)
2221
}
2322
}

src/test/scala/com/intenthq/action_processor/integrations/ExampleLocalFileCsvFeed.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,29 @@ import java.nio.file.Paths
44

55
import cats.effect.IO
66
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
7-
import com.intenthq.action_processor.integrationsV2.{Aggregate, LocalFileCsvFeed}
7+
import com.intenthq.action_processor.integrationsV2.{Aggregate, LocalFileCsvFeed, NoAggregate}
88
import fs2.Pipe
99

10-
case class Person(name: String, address: String, score: Int) {
11-
lazy val aggregateKey: AggregatedPerson = AggregatedPerson(name, address)
12-
}
13-
1410
case class AggregatedPerson(name: String, address: String)
1511

16-
object ExampleLocalFileCsvFeed extends LocalFileCsvFeed[Person, AggregatedPerson] {
12+
object ExampleLocalFileCsvFeed extends LocalFileCsvFeed[AggregatedPerson] {
1713

18-
override protected val csvResource: String = Paths.get(getClass.getResource("/persons.csv").toURI).toAbsolutePath.toString
14+
override protected val localFilePath: String = Paths.get(getClass.getResource("/persons.csv").toURI).toAbsolutePath.toString
1915

2016
csvReader.setFieldSeparator('|')
2117

22-
override def transform: Pipe[IO, Person, (AggregatedPerson, Long)] =
23-
Aggregate.aggregateByKey[Person, AggregatedPerson](_.aggregateKey, _.score.toLong)
18+
private def key(columns: Iterable[String]) = {
19+
val v = columns.toVector
20+
AggregatedPerson(v(0), v(1))
21+
}
22+
23+
private def counter(columns: Iterable[String]) = columns.lastOption.flatMap(v => scala.util.Try(v.toLong).toOption).getOrElse(0L)
24+
25+
override def transform: Pipe[IO, Iterable[String], (AggregatedPerson, Long)] = Aggregate.aggregateByKey[Iterable[String], AggregatedPerson](key, counter)
2426

2527
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter)).unsafeRunSync()
2628
}
29+
30+
object ExampleLocalFileCsvFeed2 extends LocalFileCsvFeed[Iterable[String]] with NoAggregate[Iterable[String]] {
31+
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.serialize(o).unsafeRunSync()
32+
}

0 commit comments

Comments
 (0)