@@ -2,20 +2,27 @@ package com.intenthq.action_processor.integrations.aggregations
2
2
3
3
import java .util .concurrent .ConcurrentMap
4
4
5
- import cats .effect .{Blocker , ContextShift , IO , Resource , SyncIO }
5
+ import scala .concurrent .duration .{DurationInt , FiniteDuration , NANOSECONDS }
6
+
7
+ import cats .effect .{Blocker , ContextShift , IO , Resource , Sync , SyncIO , Timer }
8
+
6
9
import com .intenthq .action_processor .integrations .config .MapDbSettings
7
10
import com .intenthq .action_processor .integrations .feeds .FeedContext
8
11
import com .intenthq .action_processor .integrations .repositories .MapDBRepository
12
+
9
13
import org .mapdb .elsa .{ElsaMaker , ElsaSerializer }
10
14
import org .mapdb .serializer .GroupSerializerObjectArray
11
15
import org .mapdb .{DataInput2 , DataOutput2 , HTreeMap , Serializer }
12
-
13
16
import scala .jdk .CollectionConverters ._
14
17
18
+ import cats .implicits ._
19
+
15
20
object Aggregate {
16
21
17
22
private lazy val blocker = Blocker [SyncIO ].allocated.unsafeRunSync()._1
18
- implicit private val contextShift : ContextShift [IO ] = IO .contextShift(scala.concurrent.ExecutionContext .global)
23
+ private val ec = scala.concurrent.ExecutionContext .global
24
+ implicit private val contextShift : ContextShift [IO ] = IO .contextShift(ec)
25
+ implicit private val timer : Timer [IO ] = IO .timer(ec)
19
26
20
27
def noop [I ]: fs2.Pipe [IO , I , (I , Long )] = _.map(_ -> 1L )
21
28
@@ -38,32 +45,83 @@ object Aggregate {
38
45
39
46
def aggregateByKeys [I , K ](feedContext : FeedContext [IO ], keys : I => List [K ], counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
40
47
sourceStream => {
48
+ val aggregateInRepository : fs2.Pipe [IO , I , ConcurrentMap [K , Long ]] =
49
+ in => {
50
+ fs2.Stream
51
+ .resource[IO , ConcurrentMap [K , Long ]](loadAggRepository(feedContext.mapDbSettings)(blocker))
52
+ .flatTap { aggRepository =>
53
+ fs2.Stream .eval_(IO .delay(println(" Starting aggregation" ))) ++
54
+ in.evalTap { o =>
55
+ IO .sleep(1 .second) >>
56
+ IO .delay {
57
+ keys(o).foreach { value =>
58
+ val previousCounter = aggRepository.getOrDefault(value, 0L )
59
+ aggRepository.put(value, counter(o) + previousCounter)
60
+ }
61
+ }
62
+ }.through(AggregationsProgress .showAggregationProgress(1 .millis))
63
+ .as(1 )
64
+ .foldMonoid
65
+ .evalMap(n => IO .delay(println(s " Finished aggregation of $n rows " )))
66
+ }
67
+ }
41
68
42
- def put (aggRepository : ConcurrentMap [K , Long ], o : I ): IO [Unit ] =
43
- IO .delay {
44
- keys(o).foreach { value =>
45
- val previousCounter = aggRepository.getOrDefault(value, 0L )
46
- aggRepository.put(value, counter(o) + previousCounter)
47
- }
48
- }.void
49
-
50
- def streamKeyValue (aggRepository : ConcurrentMap [K , Long ]): fs2.Stream [IO , (K , Long )] =
51
- fs2.Stream
52
- .fromIterator[IO ](
53
- aggRepository
54
- .entrySet()
55
- .iterator()
56
- .asScala
57
- )
58
- .map(e => (e.getKey, e.getValue))
69
+ val streamAggRepository : fs2.Pipe [IO , ConcurrentMap [K , Long ], (K , Long )] =
70
+ _.flatMap(aggRepository => fs2.Stream .iterable(aggRepository.asScala))
59
71
60
- fs2.Stream .resource[IO , ConcurrentMap [K , Long ]](loadAggRepository(feedContext.mapDbSettings)(blocker)).flatMap { aggRepository =>
61
- sourceStream.evalMap { i =>
62
- put(aggRepository, i)
63
- }.drain ++ streamKeyValue(aggRepository)
64
- }
72
+ sourceStream.through(aggregateInRepository).through(streamAggRepository)
65
73
}
66
74
67
75
def aggregateByKey [I , K ](feedContext : FeedContext [IO ], key : I => K , counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
68
76
aggregateByKeys(feedContext, key.andThen(List (_)), counter)
69
77
}
78
+
79
+ object AggregationsProgress {
80
+ def showAggregationProgress [F [_]: Sync : Timer , O ](duration : FiniteDuration ): fs2.Pipe [F , O , O ] = { in =>
81
+ val startTime = System .nanoTime()
82
+ var lastTime = System .nanoTime()
83
+ var lastRow = 0L
84
+ def formatTime (duration : FiniteDuration ): String = {
85
+ val durationSecs = duration.toSeconds
86
+ f " ${durationSecs / 3600 }%d: ${(durationSecs % 3600 ) / 60 }%02d: ${durationSecs % 60 }%02d "
87
+ }
88
+ in.through(showProgress(duration) {
89
+ case (totalRows, o) =>
90
+ Sync [F ].delay {
91
+ val now = System .nanoTime()
92
+ val totalTime = FiniteDuration (now - startTime, NANOSECONDS )
93
+ val partialTime = FiniteDuration (now - lastTime, NANOSECONDS )
94
+ val partialRows = totalRows - lastRow
95
+ lastTime = System .nanoTime()
96
+ lastRow = totalRows
97
+
98
+ println(f " \n Row # $totalRows: ${o.toString} " )
99
+ println(f " Partial time: ${formatTime(partialTime)}. Total time: ${formatTime(totalTime)}" )
100
+ println(
101
+ f " Partial speed: ${partialRows.toFloat / partialTime.toSeconds}%.2f rows/sec. Total Speed: ${totalRows.toFloat / totalTime.toSeconds}%.2f rows/sec "
102
+ )
103
+
104
+ }
105
+ })
106
+ }
107
+
108
+ def showProgress [F [_]: Sync : Timer , O ](every : FiniteDuration )(output : (Long , O ) => F [Unit ]): fs2.Pipe [F , O , O ] = { source =>
109
+ val ticks = fs2.Stream .every[F ](every)
110
+ source
111
+ // Based on zipWithIndex but starting by 1
112
+ .scanChunks(1L ) { (index, c) =>
113
+ var idx = index
114
+ val out = c.map { o =>
115
+ val r = (o, idx)
116
+ idx += 1
117
+ r
118
+ }
119
+ (idx, out)
120
+ }
121
+ .zipWith(ticks)((_, _))
122
+ .evalMap {
123
+ case ((v, index), isTick) =>
124
+ (if (isTick) output(index, v) else Sync [F ].unit) >> Sync [F ].pure(v)
125
+ }
126
+ }
127
+ }
0 commit comments