Skip to content

Commit 1faaa62

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
Add CSV Feed derivative
1 parent e17adfe commit 1faaa62

File tree

13 files changed

+194
-121
lines changed

13 files changed

+194
-121
lines changed

src/main/resources/example.csv

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
"447599999999"|"20150620000000"|"173806"|"www2.microsoft.co.uk"
2+
"447722222222"|"20150620000000"|"155322"|"apple.com"
3+
"447599999999"|"20150620000000"|"173806"|"m.microsoft.co.uk"
4+
"447599999999"|"20150620000000"|"173806"|"www.microsoft.co.uk"

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerialization.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,4 @@ object CsvSerialization {
2929
appender.close()
3030
sw.toString.getBytes(StandardCharsets.UTF_8)
3131
}
32-
3332
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.intenthq.action_processor.integrationsV2
2+
3+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
4+
5+
import cats.effect.{IO, Resource}
6+
7+
import scala.jdk.CollectionConverters.IteratorHasAsScala
8+
9+
object Aggregate {
10+
11+
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
12+
13+
private def loadAggRepository[K]: Resource[IO, ConcurrentMap[K, Long]] =
14+
Resource.liftF(IO.pure(new ConcurrentHashMap[K, Long]()))
15+
16+
def aggregateByKey[I, K](key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
17+
sourceStream => {
18+
19+
def put(aggRepository: ConcurrentMap[K, Long], o: I): IO[Unit] =
20+
IO.delay {
21+
val previousCounter = aggRepository.getOrDefault(key(o), 0L)
22+
aggRepository.put(key(o), counter(o) + previousCounter)
23+
}.void
24+
25+
def streamKeyValue(aggRepository: ConcurrentMap[K, Long]): fs2.Stream[IO, (K, Long)] =
26+
fs2.Stream
27+
.fromIterator[IO](
28+
aggRepository
29+
.entrySet()
30+
.iterator()
31+
.asScala
32+
)
33+
.map(e => (e.getKey, e.getValue))
34+
35+
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository).flatMap { aggRepository =>
36+
sourceStream.evalMap { i =>
37+
put(aggRepository, i)
38+
}.drain ++ streamKeyValue(aggRepository)
39+
}
40+
}
41+
}

src/main/scala/com/intenthq/action_processor/integrationsV2/CsvFeed.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,37 @@ import java.io.StringReader
44

55
import cats.effect.{IO, Resource}
66
import cats.implicits.catsSyntaxApplicativeId
7-
import com.intenthq.action_processor.integrations.SourceContext
8-
9-
import scala.jdk.CollectionConverters._
107
import de.siegmar.fastcsv.reader.CsvReader
118
import fs2.Stream
9+
import sourcecode.Text.generate
10+
11+
import scala.jdk.CollectionConverters._
12+
13+
abstract class CsvFeed[I <: Product, O] extends Feed[I, O] {
1214

13-
abstract class CsvFeed extends Feed[String, String] {
1415
protected val csvResource: String
1516

17+
private lazy val typeFactory = new ReflectionHelpers.CaseClassFactory[I]
18+
1619
protected lazy val csvReader: CsvReader = new CsvReader
1720

18-
protected def csvParse(line: String): IO[Iterable[String]] =
21+
protected def rows: Stream[IO, I]
22+
23+
private def csvParse(line: String): IO[Iterable[String]] =
1924
Resource.fromAutoCloseable(IO.delay(new StringReader(line))).use { sr =>
2025
Option(csvReader.parse(sr))
2126
.flatMap(parser => Option(parser.nextRow().getFields.asScala))
2227
.getOrElse(Iterable.empty[String])
2328
.pure[IO]
2429
}
2530

26-
override def inputStream(feedContext: FeedContext): Stream[IO, String]
27-
}
31+
override def inputStream: Stream[IO, I] = rows
2832

29-
new LocalFileCsvFeed(){
30-
override csvResource = "file.csv"
31-
}
33+
protected def fromString: fs2.Pipe[IO, String, I] =
34+
sourceStream => {
35+
sourceStream.evalMap { line =>
36+
val params = csvParse(line).productIterator.toList
37+
IO(typeFactory.buildWith(params))
38+
}
39+
}
40+
}
Lines changed: 9 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,19 @@
11
package com.intenthq.action_processor.integrationsV2
22

3-
import java.nio.charset.StandardCharsets
4-
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
3+
import cats.effect.IO
54

6-
import cats.effect.{ContextShift, IO, Resource}
7-
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
8-
import doobie.implicits.{toDoobieStreamOps, toSqlInterpolator}
9-
import doobie.util.query.Query0
10-
import doobie.util.transactor.Transactor
11-
import doobie.util.transactor.Transactor.Aux
12-
import fs2.Pipe
5+
trait Feed[I, O] {
136

14-
import scala.jdk.CollectionConverters.IteratorHasAsScala
7+
def inputStream: fs2.Stream[IO, I]
8+
def transform: fs2.Pipe[IO, I, (O, Long)]
9+
def serialize(o: O, counter: Long): Array[Byte]
1510

16-
/**
17-
* Scenarios with Processor
18-
*
19-
* 1. () => Stream[IO, Array[Byte]]
20-
* 2. Stream[IO, I] => Stream[IO, Array[Byte]]
21-
* 3. Stream[IO, I] => Stream[IO, O] => Stream[IO, Array[Byte]]
22-
* 4. Stream[IO, I] => Stream[IO, O] => Stream[IO, (K,V)] ... Drain ... Stream[IO, (K1,V1)] => Stream[IO, Array[Byte]]
23-
*/
24-
25-
object Aggregate {
26-
27-
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
28-
29-
private def loadAggRepository[K]: Resource[IO, ConcurrentMap[K, Long]] =
30-
Resource.pure(new ConcurrentHashMap[K, Long]())
31-
32-
def aggregateByKey[I, K]( /*feedContext: FeedContext,*/ key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
33-
sourceStream => {
34-
35-
def put(aggRepository: ConcurrentMap[K, Long], o: I): IO[Unit] =
36-
IO.delay {
37-
val previousCounter = aggRepository.getOrDefault(key(o), 0L)
38-
aggRepository.put(key(o), counter(o) + previousCounter)
39-
}.void
40-
41-
def streamKeyValue(aggRepository: ConcurrentMap[K, Long]): fs2.Stream[IO, (K, Long)] =
42-
fs2.Stream
43-
.fromIterator[IO](
44-
aggRepository
45-
.entrySet()
46-
.iterator()
47-
.asScala
48-
)
49-
.map(e => (e.getKey, e.getValue))
50-
51-
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository).flatMap { aggRepository =>
52-
sourceStream.evalMap { i =>
53-
put(i)
54-
}.drain ++ streamKeyValue
55-
}
56-
}
57-
}
58-
59-
trait Feed[I, A] {
60-
def inputStream(feedContext: FeedContext): fs2.Stream[IO, I]
61-
def transform(feedContext: FeedContext): fs2.Pipe[IO, I, (A, Long)]
62-
def serialize(a: A, counter: Long): Array[Byte]
63-
64-
final def stream(processorContext: FeedContext): fs2.Stream[IO, Array[Byte]] =
65-
inputStream(processorContext)
66-
.through(transform(processorContext))
11+
final def stream: fs2.Stream[IO, Array[Byte]] =
12+
inputStream
13+
.through(transform)
6714
.map { case (a, counter) => serialize(a, counter) }
6815
}
6916

7017
trait NoAggregate[I] { self: Feed[I, I] =>
71-
override def transform(feedContext: FeedContext): fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
72-
}
73-
74-
object Main {
75-
def main(args: Array[String]): Unit = {
76-
77-
case class Person(name: String, address: String, score: Int) {
78-
lazy val aggregateKey = new AggregatedPerson(name, address)
79-
}
80-
case class AggregatedPerson(name: String, address: String)
81-
82-
class PersonFeed extends HiveFeed[Person, Person] with NoAggregate[Person] {
83-
84-
override protected def query(feedContext: FeedContext): Query0[Person] = sql"SELECT 'Nic Cage', 9000".query[Person]
85-
86-
override def serialize(a: Person, counter: Long): Array[Byte] = CsvSerialization.serialize(a).unsafeRunSync()
87-
}
88-
89-
class PersonsAggregatedByScoreFeed extends HiveFeed[Person, AggregatedPerson] {
90-
91-
override protected def query(feedContext: FeedContext): Query0[Person] = sql"SELECT 'Nic Cage', 9000".query[Person]
92-
93-
override def transform(feedContext: FeedContext): Pipe[IO, Person, (AggregatedPerson, Long)] =
94-
Aggregate.aggregateByKey[Person, AggregatedPerson](_.aggregateKey, _.score)
95-
96-
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter)).unsafeRunSync()
97-
}
98-
}
18+
override def transform: fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
9919
}

src/main/scala/com/intenthq/action_processor/integrationsV2/FeedContext.scala

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.intenthq.action_processor.integrationsV2
22

3+
import cats.effect.IO
34
import com.intenthq.action_processor.integrations.{JavaLegacyTimeMeta, TimeMeta}
5+
import doobie.util.transactor.{Strategy, Transactor}
46

57
import scala.util.Properties
68

7-
abstract class HiveFeed[I, O] extends SQLFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
9+
abstract class HiveFeed[I, O] extends SqlFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
10+
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
811
override protected val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")
912
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.intenthq.action_processor.integrationsV2
2+
3+
import java.nio.file.Paths
4+
5+
import cats.effect.{Blocker, ContextShift, IO}
6+
import fs2.text
7+
8+
import scala.util.Properties
9+
10+
abstract class LocalFileCsvFeed[I <: Product, O] extends CsvFeed[I, O] {
11+
12+
implicit protected val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
13+
override protected val csvResource: String = Properties.envOrElse("CSV_RESOURCE", "/data.csv")
14+
15+
override protected def rows: fs2.Stream[IO, I] =
16+
fs2.Stream.resource(Blocker[IO]).flatMap { blocker =>
17+
fs2.io.file
18+
.readAll[IO](Paths.get(csvResource), blocker, 4096)
19+
.through(text.utf8Decode)
20+
.through(text.lines)
21+
.through(fromString)
22+
}
23+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.intenthq.action_processor.integrationsV2
2+
3+
import scala.reflect.runtime.universe
4+
import scala.reflect.runtime.universe.{runtimeMirror, termNames, typeOf, weakTypeOf}
5+
6+
trait ReflectionHelpers {
7+
8+
protected val classLoaderMirror: universe.Mirror = runtimeMirror(getClass.getClassLoader)
9+
10+
/**
11+
* Encapsulates functionality to reflectively invoke the constructor
12+
* for a given case class type `T`.
13+
*
14+
* @tparam T the type of the case class this factory builds
15+
*/
16+
class CaseClassFactory[T <: Product] {
17+
18+
val tpe: universe.Type = weakTypeOf[T]
19+
val classSymbol: universe.ClassSymbol = tpe.typeSymbol.asClass
20+
21+
if (!(tpe <:< typeOf[Product] && classSymbol.isCaseClass))
22+
throw new IllegalArgumentException(
23+
"CaseClassFactory only applies to case classes!"
24+
)
25+
26+
val classMirror: universe.ClassMirror = classLoaderMirror reflectClass classSymbol
27+
28+
val constructorSymbol: universe.Symbol = tpe.decl(termNames.CONSTRUCTOR)
29+
30+
val defaultConstructor: universe.MethodSymbol =
31+
if (constructorSymbol.isMethod) constructorSymbol.asMethod
32+
else {
33+
val ctors = constructorSymbol.asTerm.alternatives
34+
ctors.map(_.asMethod).find(_.isPrimaryConstructor).get
35+
}
36+
37+
val constructorMethod: universe.MethodMirror = classMirror reflectConstructor defaultConstructor
38+
39+
/**
40+
* Attempts to create a new instance of the specified type by calling the
41+
* constructor method with the supplied arguments.
42+
*
43+
* @param args the arguments to supply to the constructor method
44+
*/
45+
def buildWith(args: Seq[_]): T = constructorMethod(args: _*).asInstanceOf[T]
46+
}
47+
}
48+
49+
object ReflectionHelpers extends ReflectionHelpers
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.intenthq.action_processor.integrationsV2
2+
3+
import cats.effect.{ContextShift, IO}
4+
import doobie.implicits.toDoobieStreamOps
5+
import doobie.util.query.Query0
6+
import doobie.util.transactor.Transactor
7+
import doobie.util.transactor.Transactor.Aux
8+
9+
abstract class SqlFeed[I, O](driver: String) extends Feed[I, O] {
10+
11+
protected val jdbcUrl: String
12+
protected def query: Query0[I]
13+
14+
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
15+
16+
protected lazy val transactor: Transactor[IO] = createTransactor
17+
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
18+
19+
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
20+
21+
override def inputStream: fs2.Stream[IO, I] =
22+
query
23+
.streamWithChunkSize(chunkSize)
24+
.transact[IO](transactor)
25+
}

0 commit comments

Comments
 (0)