@@ -4,6 +4,7 @@ import java.nio.charset.StandardCharsets
4
4
import java .util .concurrent .{ConcurrentHashMap , ConcurrentMap }
5
5
6
6
import cats .effect .{ContextShift , IO , Resource }
7
+ import com .intenthq .action_processor .integrations .serializations .csv .CsvSerialization
7
8
import doobie .implicits .{toDoobieStreamOps , toSqlInterpolator }
8
9
import doobie .util .query .Query0
9
10
import doobie .util .transactor .Transactor
@@ -23,27 +24,31 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
23
24
24
25
object Aggregate {
25
26
26
- def apply [I , K ]( /* feedContext: FeedContext,*/ key : I => K , counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
27
+ def noop [I ]: fs2.Pipe [IO , I , (I , Long )] = _.map(_ -> 1L )
28
+
29
+ private def loadAggRepository [K ]: Resource [IO , ConcurrentMap [K , Long ]] =
30
+ Resource .pure(new ConcurrentHashMap [K , Long ]())
31
+
32
+ def aggregateByKey [I , K ]( /* feedContext: FeedContext,*/ key : I => K , counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
27
33
sourceStream => {
28
- val repository : ConcurrentMap [K , Long ] = new ConcurrentHashMap [K , Long ]()
29
34
30
- def put (o : I ): IO [Unit ] =
35
+ def put (aggRepository : ConcurrentMap [ K , Long ], o : I ): IO [Unit ] =
31
36
IO .delay {
32
- val previousCounter = repository .getOrDefault(key(o), 0L )
33
- repository .put(key(o), counter(o) + previousCounter)
37
+ val previousCounter = aggRepository .getOrDefault(key(o), 0L )
38
+ aggRepository .put(key(o), counter(o) + previousCounter)
34
39
}.void
35
40
36
- def streamKeyValue : fs2.Stream [IO , (K , Long )] =
41
+ def streamKeyValue ( aggRepository : ConcurrentMap [ K , Long ]) : fs2.Stream [IO , (K , Long )] =
37
42
fs2.Stream
38
43
.fromIterator[IO ](
39
- repository
44
+ aggRepository
40
45
.entrySet()
41
46
.iterator()
42
47
.asScala
43
48
)
44
49
.map(e => (e.getKey, e.getValue))
45
50
46
- fs2.Stream .resource[IO , ConcurrentMap [K , Long ]](Resource .liftF( IO .delay(repository))) .flatMap { _ =>
51
+ fs2.Stream .resource[IO , ConcurrentMap [K , Long ]](loadAggRepository) .flatMap { aggRepository =>
47
52
sourceStream.evalMap { i =>
48
53
put(i)
49
54
}.drain ++ streamKeyValue
@@ -53,65 +58,42 @@ object Aggregate {
53
58
54
59
trait Feed [I , A ] {
55
60
def inputStream (feedContext : FeedContext ): fs2.Stream [IO , I ]
56
- def transform (feedContext : FeedContext ): fs2.Pipe [IO , I , A ]
57
- def serialize (a : A ): Array [Byte ]
61
+ def transform (feedContext : FeedContext ): fs2.Pipe [IO , I , ( A , Long ) ]
62
+ def serialize (a : A , counter : Long ): Array [Byte ]
58
63
59
64
final def stream (processorContext : FeedContext ): fs2.Stream [IO , Array [Byte ]] =
60
65
inputStream(processorContext)
61
66
.through(transform(processorContext))
62
- .map(serialize)
63
- }
64
-
65
- abstract class SQLFeed [I , O ] extends Feed [I , O ] {
66
- protected val jdbcUrl : String
67
-
68
- protected val driver : String
69
-
70
- protected def query (feedContext : FeedContext ): Query0 [I ]
71
-
72
- override def inputStream (feedContext : FeedContext ): fs2.Stream [IO , I ] =
73
- query(feedContext)
74
- .streamWithChunkSize(chunkSize)
75
- .transact[IO ](transactor)
76
-
77
- implicit private val contextShift : ContextShift [IO ] = IO .contextShift(scala.concurrent.ExecutionContext .global)
78
-
79
- protected def createTransactor : Aux [IO , Unit ] = Transactor .fromDriverManager[IO ](driver, jdbcUrl)
80
-
81
- protected lazy val transactor : Transactor [IO ] = createTransactor
82
-
83
- protected val chunkSize : Int = doobie.util.query.DefaultChunkSize
67
+ .map { case (a, counter) => serialize(a, counter) }
84
68
}
85
69
86
- abstract class Hive [I , O ] extends SQLFeed [I , O ] {
87
-
88
- override protected val jdbcUrl : String = " "
89
-
90
- override protected val driver : String = " "
91
-
70
+ trait NoAggregate [I ] { self : Feed [I , I ] =>
71
+ override def transform (feedContext : FeedContext ): fs2.Pipe [IO , I , (I , Long )] = Aggregate .noop
92
72
}
93
73
94
74
object Main {
95
75
def main (args : Array [String ]): Unit = {
96
76
97
- class NoAggCase extends Hive [Int , String ] {
77
+ case class Person (name : String , address : String , score : Int ) {
78
+ lazy val aggregateKey = new AggregatedPerson (name, address)
79
+ }
80
+ case class AggregatedPerson (name : String , address : String )
98
81
99
- override protected def query ( feedContext : FeedContext ) : Query0 [ Int ] = sql " 1 " .query[ Int ]
82
+ class PersonFeed extends HiveFeed [ Person , Person ] with NoAggregate [ Person ] {
100
83
101
- override def transform (feedContext : FeedContext ): Pipe [ IO , Int , String ] = s => s.map(_.toString)
84
+ override protected def query (feedContext : FeedContext ): Query0 [ Person ] = sql " SELECT 'Nic Cage', 9000 " .query[ Person ]
102
85
103
- override def serialize (a : String ): Array [Byte ] = a.getBytes( StandardCharsets . UTF_8 )
86
+ override def serialize (a : Person , counter : Long ): Array [Byte ] = CsvSerialization .serialize(a).unsafeRunSync( )
104
87
}
105
88
106
- class AggCase extends Hive [ Int , ( String , Long ) ] {
89
+ class PersonsAggregatedByScoreFeed extends HiveFeed [ Person , AggregatedPerson ] {
107
90
108
- override protected def query (feedContext : FeedContext ): Query0 [Int ] = sql " 1 " .query[Int ]
91
+ override protected def query (feedContext : FeedContext ): Query0 [Person ] = sql " SELECT 'Nic Cage', 9000 " .query[Person ]
109
92
110
- override def transform (feedContext : FeedContext ): Pipe [IO , Int , (String , Long )] = Aggregate .apply(_.toString, _ => 1L )
93
+ override def transform (feedContext : FeedContext ): Pipe [IO , Person , (AggregatedPerson , Long )] =
94
+ Aggregate .aggregateByKey[Person , AggregatedPerson ](_.aggregateKey, _.score)
111
95
112
- override def serialize (a : ( String , Long )) : Array [Byte ] = a._1.getBytes( StandardCharsets . UTF_8 )
96
+ override def serialize (a : AggregatedPerson , counter : Long ): Array [Byte ] = CsvSerialization .serialize((a, counter)).unsafeRunSync( )
113
97
}
114
-
115
- new AggCase ().stream(new FeedContext ()).compile.drain.unsafeRunSync()
116
98
}
117
99
}
0 commit comments