File tree Expand file tree Collapse file tree 1 file changed +6
-7
lines changed
src/main/scala/com/intenthq/action_processor/integrations/aggregations Expand file tree Collapse file tree 1 file changed +6
-7
lines changed Original file line number Diff line number Diff line change @@ -52,14 +52,13 @@ object Aggregate {
52
52
.flatTap { aggRepository =>
53
53
fs2.Stream .eval_(IO .delay(println(" Starting aggregation" ))) ++
54
54
in.evalTap { o =>
55
- IO .sleep(1 .second) >>
56
- IO .delay {
57
- keys(o).foreach { value =>
58
- val previousCounter = aggRepository.getOrDefault(value, 0L )
59
- aggRepository.put(value, counter(o) + previousCounter)
60
- }
55
+ IO .delay {
56
+ keys(o).foreach { value =>
57
+ val previousCounter = aggRepository.getOrDefault(value, 0L )
58
+ aggRepository.put(value, counter(o) + previousCounter)
61
59
}
62
- }.through(AggregationsProgress .showAggregationProgress(1 .millis))
60
+ }
61
+ }.through(AggregationsProgress .showAggregationProgress(5 .seconds))
63
62
.as(1 )
64
63
.foldMonoid
65
64
.evalMap(n => IO .delay(println(s " Finished aggregation of $n rows " )))
You can’t perform that action at this time.
0 commit comments