Skip to content

Commit b70ccb6

Browse files
marc ramirezmerlinrabens
authored andcommitted
Add aggregation and simple hive implementation
1 parent 16cb71b commit b70ccb6

File tree

1 file changed

+66
-57
lines changed
  • src/main/scala/com/intenthq/action_processor/integrationsV2

1 file changed

+66
-57
lines changed
Lines changed: 66 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.intenthq.action_processor.integrationsV2
22

3+
import java.nio.charset.StandardCharsets
34
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
45

5-
import cats.effect.{IO, Resource}
6-
import doobie.util.query
6+
import cats.effect.{ContextShift, IO, Resource}
7+
import doobie.implicits.{toDoobieStreamOps, toSqlInterpolator}
8+
import doobie.util.query.Query0
9+
import doobie.util.transactor.Transactor
10+
import doobie.util.transactor.Transactor.Aux
11+
import fs2.Pipe
712

813
import scala.jdk.CollectionConverters.IteratorHasAsScala
914

@@ -18,17 +23,17 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
1823

1924
object Aggregate {
2025

21-
def apply[I, A](feedContext: FeedContext, key: I => A, counter: I => Long): fs2.Pipe[IO, I, (A, Long)] =
26+
def apply[I, K]( /*feedContext: FeedContext,*/ key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
2227
sourceStream => {
23-
val repository: ConcurrentMap[A, Long] = new ConcurrentHashMap[A, Long]()
28+
val repository: ConcurrentMap[K, Long] = new ConcurrentHashMap[K, Long]()
2429

2530
def put(o: I): IO[Unit] =
2631
IO.delay {
2732
val previousCounter = repository.getOrDefault(key(o), 0L)
2833
repository.put(key(o), counter(o) + previousCounter)
2934
}.void
3035

31-
def streamKeyValue: fs2.Stream[IO, (A, Long)] =
36+
def streamKeyValue: fs2.Stream[IO, (K, Long)] =
3237
fs2.Stream
3338
.fromIterator[IO](
3439
repository
@@ -38,7 +43,7 @@ object Aggregate {
3843
)
3944
.map(e => (e.getKey, e.getValue))
4045

41-
fs2.Stream.resource[IO, ConcurrentMap[A, Long]](Resource.liftF(IO.delay(repository))).flatMap { _ =>
46+
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](Resource.liftF(IO.delay(repository))).flatMap { _ =>
4247
sourceStream.evalMap { i =>
4348
put(i)
4449
}.drain ++ streamKeyValue
@@ -48,61 +53,65 @@ object Aggregate {
4853

4954
trait Feed[I, A] {
5055
def inputStream(feedContext: FeedContext): fs2.Stream[IO, I]
51-
def transform(feedContext: FeedContext): fs2.Pipe[IO, I, (A, Long)]
52-
def serialize(a: (A, Long)): Array[Byte]
56+
def transform(feedContext: FeedContext): fs2.Pipe[IO, I, A]
57+
def serialize(a: A): Array[Byte]
5358

5459
final def stream(processorContext: FeedContext): fs2.Stream[IO, Array[Byte]] =
5560
inputStream(processorContext)
5661
.through(transform(processorContext))
5762
.map(serialize)
5863
}
5964

60-
//object Main {
61-
// def main(args: Array[String]): Unit = {
62-
// class A extends Processor with Aggregations[Int] {
63-
// override protected def sourceStream(processorContext: ProcessorContext): Stream[IO, Int] = fs2.Stream(1, 2, 3, 4)
64-
//// override val repository: ConcurrentMap[String, Long] = new ConcurrentHashMap[String, Long]()
65-
// override def key(a: Int): String = a.toString
66-
// override def value(a: Int): Long = 1L
67-
// override def serializeAggregatedKV(o2: (String, Long)): Array[Byte] = s"${o2._1},${o2._2}".getBytes(StandardCharsets.UTF_8)
68-
// }
69-
// }
70-
//}
71-
72-
//object Main2 {
73-
// def main(args: Array[String]): Unit = {
74-
// class A extends Processor {
75-
// override protected def stream(processorContext: FeedContext): fs2.Stream[IO, Array[Byte]] =
76-
// fs2.Stream(1, 2, 3, 4, 5).map(_.toString.getBytes)
77-
// }
78-
// }
79-
//}
80-
//
81-
//object Main3 {
82-
// def main(args: Array[String]): Unit = {
83-
// class A extends Processor with HiveSource[Int] {
84-
// override protected val driver: String = "Mysql"
85-
// override protected val jdbcUrl: String = "jdbc://blah"
86-
//
87-
// override protected def query(processorContext: FeedContext): query.Query0[Int] = ???
88-
//
89-
// override def serializeRow(o2: Int): Array[Byte] = o2.toString.getBytes
90-
// }
91-
// }
92-
//}
93-
//
94-
//object Main4 {
95-
// def main(args: Array[String]): Unit = {
96-
// class A extends Processor with SQLSource[Int] {
97-
// override protected val driver: String = "Mysql"
98-
// override protected val jdbcUrl: String = "jdbc://blah"
99-
//
100-
// override protected def query(processorContext: FeedContext): query.Query0[Int] = ???
101-
//
102-
// override def serializeRow(o2: Int): Array[Byte] = null
103-
//
104-
// override protected def stream(processorContext: FeedContext): Stream[IO, Array[Byte]] =
105-
//
106-
// }
107-
// }
108-
//}
65+
abstract class SQLFeed[I, O] extends Feed[I, O] {
66+
protected val jdbcUrl: String
67+
68+
protected val driver: String
69+
70+
protected def query(feedContext: FeedContext): Query0[I]
71+
72+
override def inputStream(feedContext: FeedContext): fs2.Stream[IO, I] =
73+
query(feedContext)
74+
.streamWithChunkSize(chunkSize)
75+
.transact[IO](transactor)
76+
77+
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
78+
79+
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
80+
81+
protected lazy val transactor: Transactor[IO] = createTransactor
82+
83+
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
84+
}
85+
86+
abstract class Hive[I, O] extends SQLFeed[I, O] {
87+
88+
override protected val jdbcUrl: String = ""
89+
90+
override protected val driver: String = ""
91+
92+
}
93+
94+
object Main {
95+
def main(args: Array[String]): Unit = {
96+
97+
class NoAggCase extends Hive[Int, String] {
98+
99+
override protected def query(feedContext: FeedContext): Query0[Int] = sql"1".query[Int]
100+
101+
override def transform(feedContext: FeedContext): Pipe[IO, Int, String] = s => s.map(_.toString)
102+
103+
override def serialize(a: String): Array[Byte] = a.getBytes(StandardCharsets.UTF_8)
104+
}
105+
106+
class AggCase extends Hive[Int, (String, Long)] {
107+
108+
override protected def query(feedContext: FeedContext): Query0[Int] = sql"1".query[Int]
109+
110+
override def transform(feedContext: FeedContext): Pipe[IO, Int, (String, Long)] = Aggregate.apply(_.toString, _ => 1L)
111+
112+
override def serialize(a: (String, Long)): Array[Byte] = a._1.getBytes(StandardCharsets.UTF_8)
113+
}
114+
115+
new AggCase().stream(new FeedContext()).compile.drain.unsafeRunSync()
116+
}
117+
}

0 commit comments

Comments
 (0)