Skip to content

Commit 59e601a

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
New approach (discussed with FM and MR)
1 parent 3820e99 commit 59e601a

File tree

3 files changed

+117
-21
lines changed

3 files changed

+117
-21
lines changed
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
package com.intenthq.hybrid.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2
22

33
import cats.effect._
4-
import com.intenthq.hybrid.integrations.{JavaLegacyTimeMeta, TimeMeta}
4+
import com.intenthq.action_processor.integrations.{JavaLegacyTimeMeta, TimeMeta}
55
import doobie.util.transactor.{Strategy, Transactor}
66

77
import scala.util.Properties
88

9-
abstract class HiveSource[O] extends SQLSource[O] with Aggregations[O] with TimeMeta with JavaLegacyTimeMeta {
9+
abstract class HiveSource[I] extends SQLSource[I] with Sink with TimeMeta with JavaLegacyTimeMeta {
1010

1111
override protected val driver: String = "org.apache.hive.jdbc.HiveDriver"
1212

1313
override val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")
1414

1515
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
16-
17-
override def serialize(o2: (String, Long)): Array[Byte]
1816
}

src/main/scala/com/intenthq/action_processor/integrationsV2/Processor.scala

Lines changed: 107 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
package com.intenthq.hybrid.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2
22

3-
import java.util.concurrent.ConcurrentMap
3+
import java.nio.charset.StandardCharsets
4+
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
45

56
import cats.effect.{IO, Resource}
7+
import com.intenthq.action_processor.integrations.SourceContext
8+
import com.intenthq.hybrid.integrationsV2.ProcessorContext
9+
import doobie.util.query
610
import fs2.Stream
711

812
import scala.jdk.CollectionConverters.IteratorHasAsScala
@@ -16,34 +20,66 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
1620
* 4. Stream[IO, I] => Stream[IO, O] => Stream[IO, (K,V)] ... Drain ... Stream[IO, (K1,V1)] => Stream[IO, Array[Byte]]
1721
*/
1822

23+
object Aggregate {
24+
def apply[I, A, AC](processorContext: ProcessorContext, key: I => Option[A], counter: I => Long): fs2.Pipe[IO, I, A] = sourceStream => {
25+
Stream
26+
.resource[IO, ConcurrentMap[String, Long]](Resource.liftF(IO(repository)))
27+
.flatMap { _ =>
28+
sourceStream(processorContext).evalMap { o =>
29+
if (key(o).nonEmpty) put(o) else IO.unit
30+
}.drain ++ streamKeyValue.evalMap(i => IO.delay(serializeAggregatedKV(i)))
31+
}
32+
33+
}
34+
}
35+
trait Feed[I, A] {
36+
def inputStream(processorContext: ProcessorContext): fs2.Stream[IO, I]
37+
def transform(processorContext: ProcessorContext): fs2.Pipe[IO, I, (A, Long)] = Aggregate,apply(k = )
38+
def serialize(a: (A, Long)): Array[Byte]
39+
40+
final def stream(processorContext: ProcessorContext) =
41+
inputStream(processorContext)
42+
.through(transform(processorContext))
43+
.map(serialize)
44+
}
1945
trait Processor {
2046
def stream(processorContext: ProcessorContext): fs2.Stream[IO, Array[Byte]]
2147
}
2248

23-
trait Source[I] { self: Processor =>
24-
protected def sourceStream(processorContext: ProcessorContext): fs2.Stream[IO, I]
25-
}
49+
//trait Source[I] { self: Processor =>
50+
// protected def sourceStream(processorContext: ProcessorContext): fs2.Stream[IO, I]
51+
// protected def pipe: fs2.Pipe[IO, I, Array[Byte]] = _.map(serializeSource)
52+
// def serializeSource(i:I): Array[Byte]
53+
//}
2654

27-
trait Sink[O] { self: Processor =>
28-
def serialize(o2: O): Array[Byte]
29-
}
55+
//trait Sink[I, O] { self: Processor =>
56+
// def serialize(o2: I): O
57+
//}
3058

31-
trait Aggregations[I] extends Source[I] with Sink[(String, Long)] { self: Processor =>
59+
trait Aggregations[I, NT] extends Processor {
3260

33-
val repository: ConcurrentMap[String, Long]
61+
lazy val repository: ConcurrentMap[String, Long] = new ConcurrentHashMap[String, Long]()
3462

3563
def key(a: I): String
3664
def value(a: I): Long
65+
def serializeAggregatedKV(a: I):
66+
3767

38-
override def serialize(o2: (String, Long)): Array[Byte]
68+
def whatEveer(processorContext: ProcessorContext): Stream[IO, (String, Long)] = Stream
69+
.resource[IO, ConcurrentMap[String, Long]](Resource.liftF(IO(repository)))
70+
.flatMap { _ =>
71+
sourceStream(processorContext).evalMap { o =>
72+
if (key(o).nonEmpty) put(o) else IO.unit
73+
}.drain ++ streamKeyValue.evalMap(i => IO.delay(serializeAggregatedKV(i)))
74+
}
3975

4076
override def stream(processorContext: ProcessorContext): fs2.Stream[IO, Array[Byte]] =
4177
Stream
4278
.resource[IO, ConcurrentMap[String, Long]](Resource.liftF(IO(repository)))
4379
.flatMap { _ =>
4480
sourceStream(processorContext).evalMap { o =>
4581
if (key(o).nonEmpty) put(o) else IO.unit
46-
}.drain ++ streamKeyValue.evalMap(i => IO.delay(serialize(i)))
82+
}.drain ++ streamKeyValue.evalMap(i => IO.delay(serializeAggregatedKV(i)))
4783
}
4884

4985
private def put(o: I): IO[Unit] =
@@ -63,3 +99,62 @@ trait Aggregations[I] extends Source[I] with Sink[(String, Long)] { self: Proces
6399
.map(e => (e.getKey, e.getValue))
64100

65101
}
102+
103+
//object Main {
104+
// def main(args: Array[String]): Unit = {
105+
// class A extends Processor with Aggregations[Int] {
106+
// override protected def sourceStream(processorContext: ProcessorContext): Stream[IO, Int] = fs2.Stream(1, 2, 3, 4)
107+
//// override val repository: ConcurrentMap[String, Long] = new ConcurrentHashMap[String, Long]()
108+
// override def key(a: Int): String = a.toString
109+
// override def value(a: Int): Long = 1L
110+
// override def serializeAggregatedKV(o2: (String, Long)): Array[Byte] = s"${o2._1},${o2._2}".getBytes(StandardCharsets.UTF_8)
111+
// }
112+
// }
113+
//}
114+
115+
object Main2 {
116+
def main(args: Array[String]): Unit = {
117+
class A extends Processor {
118+
override protected def stream(processorContext: ProcessorContext): Stream[IO, Array[Byte]] =
119+
Stream(1, 2, 3, 4, 5).map(_.toString.getBytes)
120+
}
121+
}
122+
}
123+
124+
object Main3 {
125+
def main(args: Array[String]): Unit = {
126+
class A extends Processor with HiveSource[Int] {
127+
override protected val driver: String = "Mysql"
128+
override protected val jdbcUrl: String = "jdbc://blah"
129+
130+
override protected def query(processorContext: ProcessorContext): query.Query0[Int] = ???
131+
132+
override def serializeRow(o2: Int): Array[Byte] = o2.toString.getBytes
133+
}
134+
}
135+
}
136+
137+
S
138+
I
139+
O
140+
Source[S, I, O] {
141+
1. sourceStream[I]
142+
2. aggegation: noop MapDB[I, Long]
143+
3. serialize(I, PartialRow)
144+
}
145+
146+
object Main4 {
147+
def main(args: Array[String]): Unit = {
148+
class A extends Processor with SQLSource[Int] {
149+
override protected val driver: String = "Mysql"
150+
override protected val jdbcUrl: String = "jdbc://blah"
151+
152+
override protected def query(processorContext: ProcessorContext): query.Query0[Int] = ???
153+
154+
override def serializeRow(o2: Int): Array[Byte] = null
155+
156+
override protected def stream(processorContext: ProcessorContext): Stream[IO, Array[Byte]] =
157+
158+
}
159+
}
160+
}

src/main/scala/com/intenthq/action_processor/integrationsV2/SQLSource.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
package com.intenthq.hybrid.integrationsV2
1+
package com.intenthq.action_processor.integrationsV2
22

33
import java.util.concurrent.ForkJoinPool
44

55
import cats.effect.{ContextShift, IO}
6+
import com.intenthq.action_processor.integrations.DoobieImplicits
67
import com.intenthq.hybrid.integrations.DoobieImplicits
8+
import com.intenthq.hybrid.integrationsV2.ProcessorContext
79
import doobie.util.query.Query0
810
import doobie.util.transactor.Transactor
911
import doobie.util.transactor.Transactor.Aux
10-
import fs2._
1112

1213
trait SQLSource[I] extends Processor with Source[I] with DoobieImplicits {
1314

@@ -17,6 +18,8 @@ trait SQLSource[I] extends Processor with Source[I] with DoobieImplicits {
1718

1819
protected def query(processorContext: ProcessorContext): Query0[I]
1920

21+
def serializeRow(row: I): Array[Byte]
22+
2023
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
2124

2225
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
@@ -25,11 +28,11 @@ trait SQLSource[I] extends Processor with Source[I] with DoobieImplicits {
2528

2629
protected val chunkSize: Int = doobie.util.query.DefaultChunkSize
2730

28-
override protected def sourceStream(processorContext: ProcessorContext): Stream[IO, I] =
31+
override def stream(processorContext: ProcessorContext): fs2.Stream[IO, Array[Byte]] =
2932
query(processorContext)
3033
.streamWithChunkSize(chunkSize)
3134
.transact[IO](transactor)
32-
35+
.map(serializeRow)
3336
}
3437

3538
object SQLSource {

0 commit comments

Comments
 (0)