Skip to content

Commit c15017d

Browse files
author
Merlin Rabens
committed
#2309 Add csv local file source
Signed-off-by: Merlin Rabens <merlin.rabens@intenthq.com>
1 parent f39dc7b commit c15017d

File tree

8 files changed

+122
-2
lines changed

8 files changed

+122
-2
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ lazy val root = (project in file("."))
1717
scalafmtOnCompile := true,
1818
libraryDependencies ++= Seq(
1919
"co.fs2" %% "fs2-core" % "2.4.4",
20+
"co.fs2" %% "fs2-io" % "2.4.4",
2021
"com.propensive" %% "magnolia" % "0.17.0",
2122
"de.siegmar" % "fastcsv" % "1.0.3",
2223
"org.mapdb" % "mapdb" % "3.0.8",
2324
"org.tpolecat" %% "doobie-core" % "0.9.0",
2425
"org.tpolecat" %% "doobie-hikari" % "0.9.0",
26+
"com.google.guava" % "guava" % "30.0-jre",
2527
"com.disneystreaming" %% "weaver-framework" % "0.5.0" % "test",
2628
"com.disneystreaming" %% "weaver-scalacheck" % "0.4.3" % "test",
2729
"org.tpolecat" %% "doobie-h2" % "0.9.0" % "test"
Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,46 @@
11
package com.intenthq.hybrid.integrations
22

3-
class CsvSource {}
3+
import java.io.StringReader
4+
import java.nio.charset.StandardCharsets
5+
6+
import cats.effect.{IO, Resource}
7+
import cats.implicits.catsSyntaxApplicativeId
8+
import de.siegmar.fastcsv.reader.CsvReader
9+
import fs2.Stream
10+
11+
import scala.jdk.CollectionConverters._
12+
13+
abstract class CsvSource extends Source[String] {
14+
protected val csvResource: String
15+
protected def lines(): Stream[IO, String]
16+
override def stream(context: SourceContext[IO]): Stream[IO, Array[Byte]] = lines().map(_.getBytes)
17+
18+
protected lazy val csvReader: CsvReader = new CsvReader
19+
20+
protected def csvParse(line: String): IO[Iterable[String]] =
21+
Resource.fromAutoCloseable(IO.delay(new StringReader(line))).use { sr =>
22+
Option(csvReader.parse(sr))
23+
.flatMap(parser => Option(parser.nextRow().getFields.asScala))
24+
.getOrElse(Iterable.empty[String])
25+
.pure[IO]
26+
}
27+
28+
}
29+
30+
abstract class CsvSourceAgg extends CsvSource {
31+
32+
protected def aggregate(context: SourceContext[IO])(line: String): IO[Unit]
33+
34+
override def stream(context: SourceContext[IO]): Stream[IO, Array[Byte]] = {
35+
lazy val readMapAndEmit: Stream[IO, Array[Byte]] =
36+
Stream
37+
.fromIterator[IO](
38+
context.map
39+
.entrySet()
40+
.iterator()
41+
.asScala
42+
)
43+
.map(e => (e.getKey + ',' + e.getValue + '\n').getBytes(StandardCharsets.UTF_8))
44+
super.stream(context).filter(_.nonEmpty).evalMap(line => aggregate(context)(new String(line))).drain ++ readMapAndEmit
45+
}
46+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.intenthq.hybrid.integrations
2+
3+
import com.google.common.net.InternetDomainName
4+
5+
import scala.util.Try
6+
7+
object DomainStemmer {
8+
// using publicsuffix.org
9+
def apply(url: String): String = Try(InternetDomainName.from(url).topPrivateDomain().toString).getOrElse("")
10+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.intenthq.hybrid.integrations
2+
import java.nio.file.Paths
3+
4+
import cats.effect.{Blocker, ContextShift, IO}
5+
import fs2.{text, Stream}
6+
7+
import scala.util.Properties
8+
9+
abstract class LocalFileCsvSource extends CsvSourceAgg {
10+
implicit protected val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
11+
override protected val csvResource: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
12+
override protected def lines(): Stream[IO, String] =
13+
Stream.resource(Blocker[IO]).flatMap { blocker =>
14+
fs2.io.file
15+
.readAll[IO](Paths.get(csvResource), blocker, 4096)
16+
.through(text.utf8Decode)
17+
.through(text.lines)
18+
.dropLastIf(_.isEmpty)
19+
}
20+
}

src/test/resources/example.csv

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
"447599999999"|"20150620000000"|"173806"|"www2.microsoft.co.uk"
2+
"447722222222"|"20150620000000"|"155322"|"apple.com"
3+
"447599999999"|"20150620000000"|"173806"|"m.microsoft.co.uk"
4+
"447599999999"|"20150620000000"|"173806"|"www.microsoft.co.uk"

src/test/resources/rows.csv

Whitespace-only changes.
Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
11
package com.intenthq.hybrid.integrations
22

3-
object CsvFeedSpec {}
3+
import weaver.SimpleIOSuite
4+
5+
object CsvFeedSpec extends SimpleIOSuite {
6+
7+
private val expectedResult: Set[String] = Set(
8+
"447722222222,20150620000000,apple.com,1",
9+
"447599999999,20150620000000,microsoft.co.uk,3"
10+
).map(_ + '\n')
11+
12+
simpleTest("should return a stream of parsed O2 weblog feed rows") {
13+
for {
14+
rowsBytes <- ExampleLocalFileCsvFeed.stream(SourceContext.empty).compile.toList
15+
rows = rowsBytes.map(new String(_)).toSet
16+
} yield expect(rows == expectedResult)
17+
}
18+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.intenthq.hybrid.integrations
2+
3+
import java.nio.file.Paths
4+
5+
import cats.effect.IO
6+
7+
object ExampleLocalFileCsvFeed extends LocalFileCsvSource {
8+
9+
override protected val csvResource: String = Paths.get(getClass.getResource("/example.csv").toURI).toAbsolutePath.toString
10+
11+
csvReader.setFieldSeparator('|')
12+
13+
override def aggregate(context: SourceContext[IO])(line: String): IO[Unit] = {
14+
def addToContextMap(csvTokens: Iterable[String]) =
15+
csvTokens.lastOption.map(DomainStemmer.apply).fold(IO.unit) { domain =>
16+
val key = (csvTokens.dropRight(2) ++ Seq(domain)).mkString(",")
17+
val previousCounter = context.map.getOrDefault(key, 0)
18+
IO.delay(context.map.put(key, previousCounter + 1)).void
19+
}
20+
21+
for {
22+
csvTokens <- csvParse(line)
23+
_ <- addToContextMap(csvTokens)
24+
} yield ()
25+
}
26+
}

0 commit comments

Comments
 (0)