|
| 1 | +package com.intenthq.action_processor.integrationsV2 |
| 2 | + |
| 3 | +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} |
| 4 | + |
| 5 | +import cats.effect.{IO, Resource} |
| 6 | +import doobie.util.query |
| 7 | + |
| 8 | +import scala.jdk.CollectionConverters.IteratorHasAsScala |
| 9 | + |
| 10 | +/** |
| 11 | + * Scenarios with Processor |
| 12 | + * |
| 13 | + * 1. () => Stream[IO, Array[Byte]] |
| 14 | + * 2. Stream[IO, I] => Stream[IO, Array[Byte]] |
| 15 | + * 3. Stream[IO, I] => Stream[IO, O] => Stream[IO, Array[Byte]] |
| 16 | + * 4. Stream[IO, I] => Stream[IO, O] => Stream[IO, (K,V)] ... Drain ... Stream[IO, (K1,V1)] => Stream[IO, Array[Byte]] |
| 17 | + */ |
| 18 | + |
| 19 | +object Aggregate { |
| 20 | + |
| 21 | + def apply[I, A](feedContext: FeedContext, key: I => A, counter: I => Long): fs2.Pipe[IO, I, (A, Long)] = |
| 22 | + sourceStream => { |
| 23 | + val repository: ConcurrentMap[A, Long] = new ConcurrentHashMap[A, Long]() |
| 24 | + |
| 25 | + def put(o: I): IO[Unit] = |
| 26 | + IO.delay { |
| 27 | + val previousCounter = repository.getOrDefault(key(o), 0L) |
| 28 | + repository.put(key(o), counter(o) + previousCounter) |
| 29 | + }.void |
| 30 | + |
| 31 | + def streamKeyValue: fs2.Stream[IO, (A, Long)] = |
| 32 | + fs2.Stream |
| 33 | + .fromIterator[IO]( |
| 34 | + repository |
| 35 | + .entrySet() |
| 36 | + .iterator() |
| 37 | + .asScala |
| 38 | + ) |
| 39 | + .map(e => (e.getKey, e.getValue)) |
| 40 | + |
| 41 | + fs2.Stream.resource[IO, ConcurrentMap[A, Long]](Resource.liftF(IO.delay(repository))).flatMap { _ => |
| 42 | + sourceStream.evalMap { i => |
| 43 | + put(i) |
| 44 | + }.drain ++ streamKeyValue |
| 45 | + } |
| 46 | + } |
| 47 | +} |
| 48 | + |
| 49 | +trait Feed[I, A] { |
| 50 | + def inputStream(feedContext: FeedContext): fs2.Stream[IO, I] |
| 51 | + def transform(feedContext: FeedContext): fs2.Pipe[IO, I, (A, Long)] |
| 52 | + def serialize(a: (A, Long)): Array[Byte] |
| 53 | + |
| 54 | + final def stream(processorContext: FeedContext): fs2.Stream[IO, Array[Byte]] = |
| 55 | + inputStream(processorContext) |
| 56 | + .through(transform(processorContext)) |
| 57 | + .map(serialize) |
| 58 | +} |
| 59 | + |
| 60 | +//object Main { |
| 61 | +// def main(args: Array[String]): Unit = { |
| 62 | +// class A extends Processor with Aggregations[Int] { |
| 63 | +// override protected def sourceStream(processorContext: ProcessorContext): Stream[IO, Int] = fs2.Stream(1, 2, 3, 4) |
| 64 | +//// override val repository: ConcurrentMap[String, Long] = new ConcurrentHashMap[String, Long]() |
| 65 | +// override def key(a: Int): String = a.toString |
| 66 | +// override def value(a: Int): Long = 1L |
| 67 | +// override def serializeAggregatedKV(o2: (String, Long)): Array[Byte] = s"${o2._1},${o2._2}".getBytes(StandardCharsets.UTF_8) |
| 68 | +// } |
| 69 | +// } |
| 70 | +//} |
| 71 | + |
| 72 | +//object Main2 { |
| 73 | +// def main(args: Array[String]): Unit = { |
| 74 | +// class A extends Processor { |
| 75 | +// override protected def stream(processorContext: FeedContext): fs2.Stream[IO, Array[Byte]] = |
| 76 | +// fs2.Stream(1, 2, 3, 4, 5).map(_.toString.getBytes) |
| 77 | +// } |
| 78 | +// } |
| 79 | +//} |
| 80 | +// |
| 81 | +//object Main3 { |
| 82 | +// def main(args: Array[String]): Unit = { |
| 83 | +// class A extends Processor with HiveSource[Int] { |
| 84 | +// override protected val driver: String = "Mysql" |
| 85 | +// override protected val jdbcUrl: String = "jdbc://blah" |
| 86 | +// |
| 87 | +// override protected def query(processorContext: FeedContext): query.Query0[Int] = ??? |
| 88 | +// |
| 89 | +// override def serializeRow(o2: Int): Array[Byte] = o2.toString.getBytes |
| 90 | +// } |
| 91 | +// } |
| 92 | +//} |
| 93 | +// |
| 94 | +//object Main4 { |
| 95 | +// def main(args: Array[String]): Unit = { |
| 96 | +// class A extends Processor with SQLSource[Int] { |
| 97 | +// override protected val driver: String = "Mysql" |
| 98 | +// override protected val jdbcUrl: String = "jdbc://blah" |
| 99 | +// |
| 100 | +// override protected def query(processorContext: FeedContext): query.Query0[Int] = ??? |
| 101 | +// |
| 102 | +// override def serializeRow(o2: Int): Array[Byte] = null |
| 103 | +// |
| 104 | +// override protected def stream(processorContext: FeedContext): Stream[IO, Array[Byte]] = |
| 105 | +// |
| 106 | +// } |
| 107 | +// } |
| 108 | +//} |
0 commit comments