Skip to content

Commit c801954

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
Add feed context to transform method
Signed-off-by: Merlin Rabens <merlin.rabens@intenthq.com>
1 parent cb5a0ba commit c801954

File tree

11 files changed

+81
-57
lines changed

11 files changed

+81
-57
lines changed

src/main/scala/com/intenthq/action_processor/integrationsV2/aggregations/Aggregate.scala

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,40 @@
11
package com.intenthq.action_processor.integrationsV2.aggregations
22

3-
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
3+
import java.util.concurrent.ConcurrentMap
44

55
import cats.effect.{IO, Resource}
6+
import com.intenthq.action_processor.integrationsV2.config.MapDbSettings
7+
import com.intenthq.action_processor.integrationsV2.feeds.FeedContext
8+
import com.intenthq.action_processor.integrationsV2.repositories.MapDBRepository
9+
import org.mapdb.elsa.{ElsaMaker, ElsaSerializer}
10+
import org.mapdb.serializer.GroupSerializerObjectArray
11+
import org.mapdb.{DataInput2, DataOutput2, HTreeMap, Serializer}
612

713
import scala.jdk.CollectionConverters._
814

915
object Aggregate {
1016

1117
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
1218

13-
private def loadAggRepository[K]: Resource[IO, ConcurrentMap[K, Long]] =
14-
// TODO: Insert MapDB here ...
15-
Resource.liftF(IO.pure(new ConcurrentHashMap[K, Long]()))
19+
private def loadAggRepository[K](mapDbSettings: MapDbSettings): Resource[IO, HTreeMap[K, Long]] = {
1620

17-
def aggregateByKey[I, K](key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
21+
val serializer: Serializer[K] = new GroupSerializerObjectArray[K] {
22+
val elsaSerializer: ElsaSerializer = new ElsaMaker().make
23+
override def deserialize(input: DataInput2, available: Int): K = elsaSerializer.deserialize[K](input)
24+
override def serialize(out: DataOutput2, value: K): Unit = elsaSerializer.serialize(out, value)
25+
}
26+
27+
MapDBRepository
28+
.load(mapDbSettings)
29+
.map(db =>
30+
db.hashMap("stuff", serializer, Serializer.LONG)
31+
.layout(mapDbSettings.segments, mapDbSettings.nodeSize, mapDbSettings.levels)
32+
.createOrOpen()
33+
.asInstanceOf[HTreeMap[K, Long]]
34+
)
35+
}
36+
37+
def aggregateByKey[I, K](feedContext: FeedContext[IO], key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
1838
sourceStream => {
1939

2040
def put(aggRepository: ConcurrentMap[K, Long], o: I): IO[Unit] =
@@ -33,7 +53,7 @@ object Aggregate {
3353
)
3454
.map(e => (e.getKey, e.getValue))
3555

36-
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository).flatMap { aggRepository =>
56+
fs2.Stream.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings)).flatMap { aggRepository =>
3757
sourceStream.evalMap { i =>
3858
put(aggRepository, i)
3959
}.drain ++ streamKeyValue(aggRepository)
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.intenthq.action_processor.integrationsV2.aggregations
22

33
import cats.effect.IO
4-
import com.intenthq.action_processor.integrationsV2.feeds.Feed
4+
import com.intenthq.action_processor.integrationsV2.feeds.{Feed, FeedContext}
5+
6+
import scala.annotation.unused
57

68
trait NoAggregate[I] {
79
self: Feed[I, I] =>
8-
override def transform: fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
10+
override def transform(@unused feedContext: FeedContext[IO]): fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
911
}

src/main/scala/com/intenthq/action_processor/integrationsV2/config/MapDBSettings.scala

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.intenthq.action_processor.integrationsV2.config
2+
3+
import java.nio.file.Path
4+
5+
case class MapDbSettings(dbPath: Path, startDbSize: Long, incSize: Long, segments: Int, nodeSize: Int, levels: Int)

src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/Feed.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import cats.effect.IO
55
trait Feed[I, O] {
66

77
def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I]
8-
def transform: fs2.Pipe[IO, I, (O, Long)]
8+
def transform(feedContext: FeedContext[IO]): fs2.Pipe[IO, I, (O, Long)]
99
def serialize(o: O, counter: Long): Array[Byte]
1010

1111
final def stream(feedContext: FeedContext[IO]): fs2.Stream[IO, Array[Byte]] =
1212
inputStream(feedContext)
13-
.through(transform)
13+
.through(transform(feedContext))
1414
.map((serialize _).tupled)
1515
}

src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/FeedContext.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,8 @@ package com.intenthq.action_processor.integrationsV2.feeds
22

33
import java.time.{LocalDate, LocalTime}
44

5-
import com.intenthq.action_processor.integrationsV2.config.MapDBSettings
5+
import com.intenthq.action_processor.integrationsV2.config.MapDbSettings
66
import com.intenthq.embeddings.Mapping
77

88
case class FeedFilter(date: Option[LocalDate], time: Option[LocalTime])
9-
10-
object FeedFilter {
11-
val empty: FeedFilter = FeedFilter(None, None)
12-
}
13-
14-
case class FeedContext[F[_]](embeddings: Option[Mapping[String, Array[Int], F]], filter: FeedFilter, mapDbSettings: MapDBSettings)
15-
16-
object FeedContext {
17-
def empty[F[_]]: FeedContext[F] = FeedContext[F](None, FeedFilter.empty, MapDBSettings.Default)
18-
}
9+
case class FeedContext[F[_]](embeddings: Option[Mapping[String, Array[Int], F]], filter: FeedFilter, mapDbSettings: MapDbSettings)

src/main/scala/com/intenthq/action_processor/integrationsV2/repositories/MapDBRepository.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ import java.nio.file.Paths
55
import java.time.LocalDateTime
66

77
import cats.effect.{IO, Resource}
8-
import com.intenthq.action_processor.integrationsV2.config.MapDBSettings
8+
import com.intenthq.action_processor.integrationsV2.config.MapDbSettings
99
import org.mapdb.{DB, DBMaker}
1010

1111
object MapDBRepository {
12-
def load(mapDBSettings: MapDBSettings): Resource[IO, DB] = {
12+
def load(mapDBSettings: MapDbSettings): Resource[IO, DB] = {
1313
val dbCreated = for {
1414
now <- IO.delay(LocalDateTime.now())
1515
dbFile <- IO.delay(new File(Paths.get(mapDBSettings.dbPath.toString, s"dbb-${now.toLocalDate}-${now.toLocalTime}").toUri))
1616
_ <- IO.delay(dbFile.deleteOnExit())
1717
createDb <- IO.delay {
1818
DBMaker
1919
.fileDB(dbFile.getAbsolutePath)
20-
.allocateStartSize(mapDBSettings.startDbSize * 1024 * 1024 * 1024)
21-
.allocateIncrement(mapDBSettings.incSize * 1024 * 1024 * 1024)
20+
.allocateStartSize(mapDBSettings.startDbSize)
21+
.allocateIncrement(mapDBSettings.incSize)
2222
.fileMmapEnableIfSupported()
2323
.fileMmapPreclearDisable()
2424
.closeOnJvmShutdown()

src/test/scala/com/intenthq/action_processor/integrations/CSVFeedSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import cats.effect.{IO, Resource}
88
import cats.implicits._
99
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
1010
import com.intenthq.action_processor.integrationsV2.aggregations.NoAggregate
11-
import com.intenthq.action_processor.integrationsV2.feeds.{FeedContext, LocalFileCSVFeed}
11+
import com.intenthq.action_processor.integrationsV2.feeds.LocalFileCSVFeed
1212
import weaver.IOSuite
1313

1414
object CSVFeedSpec extends IOSuite with CSVFeedSpecResources {
@@ -29,7 +29,7 @@ object CSVFeedSpec extends IOSuite with CSVFeedSpecResources {
2929
).map(_ + '\n')
3030

3131
for {
32-
feedStreamLinesBytes <- resources.csvFeed.stream(FeedContext.empty).compile.toList
32+
feedStreamLinesBytes <- resources.csvFeed.stream(Defaults.feedContext).compile.toList
3333
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
3434
} yield expect(feedStreamLines == expectedResult)
3535
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.intenthq.action_processor.integrations
2+
3+
import java.nio.file.Paths
4+
5+
import com.intenthq.action_processor.integrationsV2.config.MapDbSettings
6+
import com.intenthq.action_processor.integrationsV2.feeds.{FeedContext, FeedFilter}
7+
8+
object Defaults {
9+
10+
val mapDbSettings: MapDbSettings = MapDbSettings(
11+
dbPath = Paths.get("/tmp"),
12+
startDbSize = 1L * 1024 * 1024,
13+
incSize = 1L,
14+
segments = 16,
15+
nodeSize = 128,
16+
levels = 4
17+
)
18+
19+
val feedFilter: FeedFilter = FeedFilter(None, None)
20+
21+
def feedContext[F[_]]: FeedContext[F] = FeedContext[F](None, feedFilter, mapDbSettings)
22+
}
Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.intenthq.action_processor.integrations
22

33
import java.nio.charset.StandardCharsets
4+
import java.time.LocalDateTime
45

56
import cats.effect.IO
67
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
@@ -13,33 +14,33 @@ object FeedAggregatedSpec extends SimpleIOSuite {
1314

1415
test("should return a stream of aggregated csv feed rows") {
1516
val aggregatedFeed = new PersonsAggregatedByScoreFeed(
16-
Person("Peter", "Big Street 1", 5),
17-
Person("Gabriela", "Big Street 2", 7),
18-
Person("Jolie", "Big Street 3", 4),
19-
Person("Peter", "Big Street 1", 6)
17+
Person("Peter", LocalDateTime.parse("2001-01-01T00:00:00"), 5),
18+
Person("Gabriela", LocalDateTime.parse("2002-01-01T00:00:00"), 7),
19+
Person("Jolie", LocalDateTime.parse("2003-01-01T00:00:00"), 4),
20+
Person("Peter", LocalDateTime.parse("2001-01-01T00:00:00"), 6)
2021
)
2122

2223
val expectedResult: Set[String] = Set(
23-
"Peter,Big Street 1,11",
24-
"Gabriela,Big Street 2,7",
25-
"Jolie,Big Street 3,4"
24+
"Peter,2001-01-01T00:00:00,11",
25+
"Gabriela,2002-01-01T00:00:00,7",
26+
"Jolie,2003-01-01T00:00:00,4"
2627
).map(_ + '\n')
2728

2829
for {
29-
feedStreamLinesBytes <- aggregatedFeed.stream(FeedContext.empty).compile.toList
30+
feedStreamLinesBytes <- aggregatedFeed.stream(Defaults.feedContext).compile.toList
3031
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
3132
} yield expect(feedStreamLines == expectedResult)
3233
}
3334
}
3435

35-
case class Person(name: String, address: String, score: Int)
36-
case class AggregatedPerson(name: String, address: String)
36+
case class Person(name: String, bornDate: LocalDateTime, score: Int)
37+
case class AggregatedPerson(name: String, bornDate: LocalDateTime)
3738

3839
class PersonsAggregatedByScoreFeed(persons: Person*) extends Feed[Person, AggregatedPerson] {
3940
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, Person] = fs2.Stream(persons: _*).covary[IO]
4041

41-
override def transform: Pipe[IO, Person, (AggregatedPerson, Long)] =
42-
Aggregate.aggregateByKey[Person, AggregatedPerson](person => AggregatedPerson(person.name, person.address), _.score.toLong)
42+
override def transform(feedContext: FeedContext[IO]): Pipe[IO, Person, (AggregatedPerson, Long)] =
43+
Aggregate.aggregateByKey[Person, AggregatedPerson](feedContext, person => AggregatedPerson(person.name, person.bornDate), _.score.toLong)
4344

4445
override def serialize(o: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((o, counter))
4546
}

0 commit comments

Comments
 (0)