Skip to content

Commit fd4e49a

Browse files
fernandomoramerlinrabens
authored andcommitted
Non-effectful CSV serialization
1 parent ef48e73 commit fd4e49a

File tree

9 files changed

+35
-63
lines changed

9 files changed

+35
-63
lines changed

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/Csv.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ object Csv {
1919
def dispatch[A](ctx: SealedTrait[Csv, A]): Csv[A] = (a: A) => ctx.dispatch(a)(sub => sub.typeclass.toCSV(sub.cast(a)))
2020

2121
implicit def csvOpt[T: Csv]: Csv[Option[T]] = (a: Option[T]) => a.fold(Array[String](""))(Csv[T].toCSV)
22+
implicit def csvIterable[T: Csv]: Csv[Iterable[T]] = (a: Iterable[T]) => a.foldLeft(Array[String](""))((acc, t) => acc ++ Csv[T].toCSV(t))
2223
implicit val csvStr: Csv[String] = (a: String) => Array(a)
2324
implicit val csvInt: Csv[Int] = (a: Int) => Array(a.toString)
2425
implicit val csvLong: Csv[Long] = (a: Long) => Array(a.toString)

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerialization.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,25 @@ package com.intenthq.action_processor.integrations.serializations.csv
33
import java.io._
44
import java.nio.charset.StandardCharsets
55

6-
import cats.effect.IO
7-
//import de.siegmar.fastcsv.writer.CsvWriter
8-
import com.opencsv.CSVWriter
9-
10-
import scala.collection.immutable.ArraySeq
6+
import de.siegmar.fastcsv.writer.CsvWriter
117

128
object CsvSerialization {
139

1410
val lineDelimiter: String = "\n"
15-
private val csvWriter = new CSVWriter(new StringWriter())
11+
private val csvWriter = {
12+
val writer = new CsvWriter()
13+
writer.setLineDelimiter(lineDelimiter.toCharArray)
14+
writer
15+
}
1616

17-
def serialize[O](o: O)(implicit csv: Csv[O]): IO[Array[Byte]] =
18-
IO.delay {
19-
unsafeSerialise(ArraySeq.unsafeWrapArray(csv.toCSV(o)))
20-
}
17+
private def encode[O](o: O)(implicit csv: Csv[O]): Array[String] = csv.toCSV(o)
2118

22-
def unsafeSerialise(row: Seq[String]): Array[Byte] = {
19+
def serialize[O](o: O)(implicit csv: Csv[O]): Array[Byte] = {
2320
val sw = new StringWriter()
21+
val columns = encode(o)
2422
val appender = csvWriter.append(sw)
25-
appender.appendLine(row: _*)
23+
appender.appendLine(columns: _*)
24+
// Make sure we flush internal appender FastBufferedWriter
2625
appender.close()
2726
sw.toString.getBytes(StandardCharsets.UTF_8)
2827
}

src/main/scala/com/intenthq/action_processor/integrationsV2/CsvFeed.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import cats.effect.{IO, Resource}
66
import cats.implicits.catsSyntaxApplicativeId
77
import de.siegmar.fastcsv.reader.CsvReader
88
import fs2.Stream
9-
import sourcecode.Text.generate
109

1110
import scala.jdk.CollectionConverters._
1211

@@ -23,7 +22,7 @@ abstract class CsvFeed[O] extends Feed[Iterable[String], O] {
2322
private def csvParse(line: String): IO[Iterable[String]] =
2423
Resource.fromAutoCloseable(IO.delay(new StringReader(line))).use { sr =>
2524
Option(csvReader.parse(sr))
26-
.flatMap(parser => Option(parser.nextRow().getFields.asScala))
25+
.flatMap(parser => Option(parser.nextRow()).map(_.getFields.asScala))
2726
.getOrElse(collection.mutable.Buffer.empty[String])
2827
.pure[IO]
2928
}

src/main/scala/com/intenthq/action_processor/integrationsV2/HiveFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import doobie.util.transactor.{Strategy, Transactor}
66

77
import scala.util.Properties
88

9-
abstract class HiveFeed[I, O] extends SqlFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
9+
abstract class HiveFeed[I, O] extends SQLFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
1010
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
1111
override protected val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")
1212
}
Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
package com.intenthq.action_processor.integrationsV2
22

33
import cats.effect.{ContextShift, IO}
4-
import com.intenthq.action_processor.integrations.SQLSource
54
import doobie.implicits.toDoobieStreamOps
65
import doobie.util.query.Query0
76
import doobie.util.transactor.Transactor
87
import doobie.util.transactor.Transactor.Aux
98

10-
abstract class SQLFeed[I, O](driver: String, parallelism: Int = SQLSource.DefaultParallelism) extends Feed[I, O] {
9+
abstract class SQLFeed[I, O](driver: String) extends Feed[I, O] {
1110

1211
protected val jdbcUrl: String
13-
14-
protected def query(feedContext: FeedContext): Query0[I]
15-
16-
override def inputStream(feedContext: FeedContext): fs2.Stream[IO, I] =
17-
query(feedContext)
18-
.streamWithChunkSize(chunkSize)
19-
.transact[IO](transactor)
12+
protected def query: Query0[I]
2013

2114
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
2215

23-
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
24-
2516
protected lazy val transactor: Transactor[IO] = createTransactor
26-
2717
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
18+
19+
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
20+
21+
override def inputStream: fs2.Stream[IO, I] =
22+
query
23+
.streamWithChunkSize(chunkSize)
24+
.transact[IO](transactor)
2825
}

src/main/scala/com/intenthq/action_processor/integrationsV2/SqlFeed.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/test/scala/com/intenthq/action_processor/integrations/ExampleCsvFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ abstract class H2Source[O] extends SQLSource[O]("org.h2.Driver") with TimeMeta w
2929

3030
object ExampleCsvFeed extends H2Source[ExampleCsvFeedRow] {
3131

32-
override def transform(context: SourceContext[IO])(e: ExampleCsvFeedRow): IO[Array[Byte]] = CsvSerialization.serialize[ExampleCsvFeedRow](e)
32+
override def transform(context: SourceContext[IO])(e: ExampleCsvFeedRow): IO[Array[Byte]] = IO.pure(CsvSerialization.serialize[ExampleCsvFeedRow](e))
3333

3434
override def query(context: SourceContext[IO]): Query0[ExampleCsvFeedRow] =
3535
(

src/test/scala/com/intenthq/action_processor/integrations/ExampleLocalFileCsvFeed.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ object ExampleLocalFileCsvFeed extends LocalFileCsvFeed[AggregatedPerson] {
2424

2525
override def transform: Pipe[IO, Iterable[String], (AggregatedPerson, Long)] = Aggregate.aggregateByKey[Iterable[String], AggregatedPerson](key, counter)
2626

27-
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter)).unsafeRunSync()
27+
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter))
2828
}
2929

3030
object ExampleLocalFileCsvFeed2 extends LocalFileCsvFeed[Iterable[String]] with NoAggregate[Iterable[String]] {
31-
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.serialize(o).unsafeRunSync()
31+
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.serialize(o)
3232
}

src/test/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerializationSpec.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package com.intenthq.action_processor.integrations.serializations.csv
22

3-
import cats.effect.IO
4-
import cats.implicits._
3+
import java.nio.charset.StandardCharsets
54
import java.time._
5+
6+
import cats.implicits._
67
import weaver.{Expectations, SimpleIOSuite}
78

89
object CsvSerializationSpec extends SimpleIOSuite {
910

10-
private def serialize[T: Csv](t: T): IO[String] = CsvSerialization.serialize(t).map(new String(_))
11-
private def checkLine[T: Csv](toSer: T, csv: String): IO[Expectations] =
12-
for {
13-
result <- serialize(toSer)
14-
} yield expect(result == csv + CsvSerialization.lineDelimiter)
11+
private def serialize[T: Csv](t: T): String = new String(CsvSerialization.serialize(t), StandardCharsets.UTF_8)
12+
private def checkLine[T: Csv](toSer: T, csv: String): Expectations = {
13+
val result = serialize(toSer)
14+
expect(result == csv + CsvSerialization.lineDelimiter)
15+
}
1516

16-
simpleTest("Serialize a single field case class") {
17+
pureTest("Serialize a single field case class") {
1718
case class Test(a: String)
1819
checkLine(Test("a"), "a")
1920
}

0 commit comments

Comments
 (0)