Skip to content

Commit a26cc7a

Browse files
marc ramirezmerlinrabens
authored andcommitted
WIP
1 parent 18f0792 commit a26cc7a

File tree

4 files changed

+127
-0
lines changed

4 files changed

+127
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.intenthq.hybrid.integrationsV2
2+
3+
import java.nio.charset.StandardCharsets
4+
5+
import cats.effect._
6+
import com.intenthq.hybrid.integrations.{JavaLegacyTimeMeta, TimeMeta}
7+
import doobie.util.transactor.{Strategy, Transactor}
8+
9+
import scala.util.Properties
10+
11+
abstract class HiveSource[O] extends Processor with SQLSource[O] with Aggregations[O] with TimeMeta with JavaLegacyTimeMeta {
12+
13+
override protected val driver: String = "org.apache.hive.jdbc.HiveDriver"
14+
15+
override val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")
16+
17+
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
18+
19+
override def serialize(o2: (String, Long)): Array[Byte] = s"${o2._1},${o2._2}\n".getBytes(StandardCharsets.UTF_8)
20+
21+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.intenthq.hybrid.integrationsV2
2+
3+
import java.util.concurrent.ConcurrentMap
4+
5+
import cats.effect.{IO, Resource}
6+
import fs2.Stream
7+
8+
import scala.jdk.CollectionConverters.IteratorHasAsScala
9+
10+
/**
11+
* Scenarios with Processor
12+
*
13+
* 1. () => Stream[IO, Array[Byte]]
14+
* 2. Stream[IO, I] => Stream[IO, Array[Byte]]
15+
* 3. Stream[IO, I] => Stream[IO, O] => Stream[IO, Array[Byte]]
16+
* 4. Stream[IO, I] => Stream[IO, O] => Stream[IO, (K,V)] ... Drain ... Stream[IO, (K1,V1)] => Stream[IO, Array[Byte]]
17+
*/
18+
19+
trait Processor {
20+
def stream(processorContext: ProcessorContext): fs2.Stream[IO, Array[Byte]]
21+
}
22+
23+
trait Source[I] { self: Processor =>
24+
protected def sourceStream(processorContext: ProcessorContext): fs2.Stream[IO, I]
25+
}
26+
27+
trait Sink[O] { self: Processor =>
28+
def serialize(o2: O): Array[Byte]
29+
}
30+
31+
trait Aggregations[I] extends Source[I] with Sink[(String, Long)] { self: Processor =>
32+
33+
val repository: ConcurrentMap[String, Long]
34+
35+
def key(a: I): String
36+
def value(a: I): Long
37+
38+
override def serialize(o2: (String, Long)): Array[Byte]
39+
40+
override def stream(processorContext: ProcessorContext): fs2.Stream[IO, Array[Byte]] =
41+
Stream
42+
.resource[IO, ConcurrentMap[String, Long]](Resource.liftF(IO(repository)))
43+
.flatMap { _ =>
44+
sourceStream(processorContext).evalMap { o =>
45+
if (key(o).nonEmpty) put(o) else IO.unit
46+
}.drain ++ streamKeyValue.evalMap(i => IO.delay(serialize(i)))
47+
}
48+
49+
private def put(o: I): IO[Unit] =
50+
IO.delay {
51+
val previousCounter = repository.getOrDefault(key(o), 0)
52+
repository.put(key(o), previousCounter + value(o))
53+
}.void
54+
55+
private def streamKeyValue: Stream[IO, (String, Long)] =
56+
fs2.Stream
57+
.fromIterator[IO](
58+
repository
59+
.entrySet()
60+
.iterator()
61+
.asScala
62+
)
63+
.map(e => (e.getKey, e.getValue))
64+
65+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.intenthq.hybrid.integrationsV2
2+
3+
case class ProcessorContext()
4+
object ProcessorContext {}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.intenthq.hybrid.integrationsV2
2+
3+
import java.util.concurrent.ForkJoinPool
4+
5+
import cats.effect.{ContextShift, IO}
6+
import com.intenthq.hybrid.integrations.DoobieImplicits
7+
import doobie.util.query.Query0
8+
import doobie.util.transactor.Transactor
9+
import doobie.util.transactor.Transactor.Aux
10+
import fs2._
11+
12+
trait SQLSource[I] extends Processor with Source[I] with DoobieImplicits {
13+
14+
protected val driver: String
15+
16+
protected val jdbcUrl: String
17+
18+
protected def query(processorContext: ProcessorContext): Query0[I]
19+
20+
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
21+
22+
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
23+
24+
protected lazy val transactor: Transactor[IO] = createTransactor
25+
26+
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
27+
28+
override protected def sourceStream(processorContext: ProcessorContext): Stream[IO, I] =
29+
query(processorContext)
30+
.streamWithChunkSize(chunkSize)
31+
.transact[IO](transactor)
32+
33+
}
34+
35+
object SQLSource {
36+
val DefaultParallelism: Int = scala.concurrent.ExecutionContext.global.asInstanceOf[ForkJoinPool].getParallelism
37+
}

0 commit comments

Comments
 (0)