@@ -2,27 +2,23 @@ package com.intenthq.action_processor.integrations.aggregations
2
2
3
3
import java .util .concurrent .ConcurrentMap
4
4
5
- import scala .concurrent . duration .{ DurationInt , FiniteDuration , NANOSECONDS }
5
+ import scala .jdk . CollectionConverters . _
6
6
7
- import cats .effect .{Blocker , ContextShift , IO , Resource , Sync , SyncIO , Timer }
7
+ import cats .effect .{Blocker , ContextShift , IO , Resource , SyncIO }
8
8
9
9
import com .intenthq .action_processor .integrations .config .MapDbSettings
10
10
import com .intenthq .action_processor .integrations .feeds .FeedContext
11
11
import com .intenthq .action_processor .integrations .repositories .MapDBRepository
12
12
13
+ import org .mapdb .{DataInput2 , DataOutput2 , HTreeMap , Serializer }
13
14
import org .mapdb .elsa .{ElsaMaker , ElsaSerializer }
14
15
import org .mapdb .serializer .GroupSerializerObjectArray
15
- import org .mapdb .{DataInput2 , DataOutput2 , HTreeMap , Serializer }
16
- import scala .jdk .CollectionConverters ._
17
-
18
- import cats .implicits ._
19
16
20
17
object Aggregate {
21
18
22
19
private lazy val blocker = Blocker [SyncIO ].allocated.unsafeRunSync()._1
23
20
private val ec = scala.concurrent.ExecutionContext .global
24
21
implicit private val contextShift : ContextShift [IO ] = IO .contextShift(ec)
25
- implicit private val timer : Timer [IO ] = IO .timer(ec)
26
22
27
23
def noop [I ]: fs2.Pipe [IO , I , (I , Long )] = _.map(_ -> 1L )
28
24
@@ -45,26 +41,30 @@ object Aggregate {
45
41
46
42
def aggregateByKeys [I , K ](feedContext : FeedContext [IO ], keys : I => List [K ], counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
47
43
sourceStream => {
44
+
45
+ // This pipe aggregates all the elemens and returns a single Map as an aggregate repository
48
46
val aggregateInRepository : fs2.Pipe [IO , I , ConcurrentMap [K , Long ]] =
49
47
in => {
50
48
fs2.Stream
51
49
.resource[IO , ConcurrentMap [K , Long ]](loadAggRepository(feedContext.mapDbSettings)(blocker))
52
- .flatTap { aggRepository =>
50
+ .flatMap { aggRepository =>
53
51
fs2.Stream .eval_(IO .delay(println(" Starting aggregation" ))) ++
54
- in.evalTap { o =>
52
+ in.evalMapChunk { o =>
55
53
IO .delay {
56
54
keys(o).foreach { value =>
57
55
val previousCounter = aggRepository.getOrDefault(value, 0L )
58
56
aggRepository.put(value, counter(o) + previousCounter)
59
57
}
58
+ aggRepository
60
59
}
61
- }.through( AggregationsProgress .showAggregationProgress( 5 .seconds))
62
- .as( 1 )
63
- .foldMonoid
64
- .evalMap(n => IO .delay(println(s " Finished aggregation of $n rows " )))
60
+ }
61
+ // Returns last aggRepository with the counter of elements
62
+ .fold((aggRepository, 0L )) { case ((_, previousRows), aggRepository) => (aggRepository, previousRows + 1 ) }
63
+ .evalMapChunk { case (aggRepository, n) => IO .delay(println(s " Finished aggregation of $n rows " )).as(aggRepository) }
65
64
}
66
65
}
67
66
67
+ // Streams the givens aggregate repository entries
68
68
val streamAggRepository : fs2.Pipe [IO , ConcurrentMap [K , Long ], (K , Long )] =
69
69
_.flatMap(aggRepository => fs2.Stream .iterable(aggRepository.asScala))
70
70
@@ -74,53 +74,3 @@ object Aggregate {
74
74
def aggregateByKey [I , K ](feedContext : FeedContext [IO ], key : I => K , counter : I => Long ): fs2.Pipe [IO , I , (K , Long )] =
75
75
aggregateByKeys(feedContext, key.andThen(List (_)), counter)
76
76
}
77
-
78
- object AggregationsProgress {
79
- def showAggregationProgress [F [_]: Sync : Timer , O ](duration : FiniteDuration ): fs2.Pipe [F , O , O ] = { in =>
80
- val startTime = System .nanoTime()
81
- var lastTime = System .nanoTime()
82
- var lastRow = 0L
83
- def formatTime (duration : FiniteDuration ): String = {
84
- val durationSecs = duration.toSeconds
85
- f " ${durationSecs / 3600 }%d: ${(durationSecs % 3600 ) / 60 }%02d: ${durationSecs % 60 }%02d "
86
- }
87
- in.through(showProgress(duration) {
88
- case (totalRows, o) =>
89
- Sync [F ].delay {
90
- val now = System .nanoTime()
91
- val totalTime = FiniteDuration (now - startTime, NANOSECONDS )
92
- val partialTime = FiniteDuration (now - lastTime, NANOSECONDS )
93
- val partialRows = totalRows - lastRow
94
- lastTime = System .nanoTime()
95
- lastRow = totalRows
96
-
97
- println(f " \n Row # $totalRows: ${o.toString} " )
98
- println(f " Partial time: ${formatTime(partialTime)}. Total time: ${formatTime(totalTime)}" )
99
- println(
100
- f " Partial speed: ${partialRows.toFloat / partialTime.toSeconds}%.2f rows/sec. Total Speed: ${totalRows.toFloat / totalTime.toSeconds}%.2f rows/sec "
101
- )
102
-
103
- }
104
- })
105
- }
106
-
107
- def showProgress [F [_]: Sync : Timer , O ](every : FiniteDuration )(output : (Long , O ) => F [Unit ]): fs2.Pipe [F , O , O ] = { source =>
108
- val ticks = fs2.Stream .every[F ](every)
109
- source
110
- // Based on zipWithIndex but starting by 1
111
- .scanChunks(1L ) { (index, c) =>
112
- var idx = index
113
- val out = c.map { o =>
114
- val r = (o, idx)
115
- idx += 1
116
- r
117
- }
118
- (idx, out)
119
- }
120
- .zipWith(ticks)((_, _))
121
- .evalMap {
122
- case ((v, index), isTick) =>
123
- (if (isTick) output(index, v) else Sync [F ].unit) >> Sync [F ].pure(v)
124
- }
125
- }
126
- }
0 commit comments