Skip to content

Commit 638fd38

Browse files
committed
IC-40 Supports mapping file with multiple values
1 parent bfce314 commit 638fd38

File tree

6 files changed

+80
-31
lines changed

6 files changed

+80
-31
lines changed

build.sbt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,19 @@ lazy val root = (project in file("."))
1515
name := "action-processor-integrations",
1616
Compile / packageDoc / mappings := Seq(),
1717
Compile / packageSrc / mappings := Seq(),
18-
testFrameworks += new TestFramework("weaver.framework.TestFramework"),
18+
testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
1919
scalafmtOnCompile := true,
2020
libraryDependencies ++= Seq(
21-
"co.fs2" %% "fs2-core" % "2.4.4",
22-
"co.fs2" %% "fs2-io" % "2.4.4",
21+
"co.fs2" %% "fs2-core" % "2.5.3",
22+
"co.fs2" %% "fs2-io" % "2.5.3",
2323
"com.propensive" %% "magnolia" % "0.17.0",
2424
"de.siegmar" % "fastcsv" % "1.0.3",
2525
"org.mapdb" % "mapdb" % "3.0.8",
2626
"org.tpolecat" %% "doobie-core" % "0.9.0",
2727
"org.tpolecat" %% "doobie-hikari" % "0.9.0",
2828
"com.google.guava" % "guava" % "30.0-jre",
29-
"com.disneystreaming" %% "weaver-framework" % "0.5.0" % "test",
30-
"org.tpolecat" %% "doobie-h2" % "0.9.0" % "test"
29+
"com.disneystreaming" %% "weaver-core" % "0.6.2" % Test,
30+
"com.disneystreaming" %% "weaver-cats" % "0.6.2" % Test,
31+
"org.tpolecat" %% "doobie-h2" % "0.9.0" % Test
3132
)
3233
)

src/main/scala/com/intenthq/action_processor/integrations/aggregations/Aggregate.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ object Aggregate {
3636
)
3737
}
3838

39-
def aggregateByKey[I, K](feedContext: FeedContext[IO], key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
39+
def aggregateByKeys[I, K](feedContext: FeedContext[IO], keys: I => List[K], counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
4040
sourceStream => {
4141

4242
def put(aggRepository: ConcurrentMap[K, Long], o: I): IO[Unit] =
4343
IO.delay {
44-
val previousCounter = aggRepository.getOrDefault(key(o), 0L)
45-
aggRepository.put(key(o), counter(o) + previousCounter)
44+
keys(o).foreach { value =>
45+
val previousCounter = aggRepository.getOrDefault(value, 0L)
46+
aggRepository.put(value, counter(o) + previousCounter)
47+
}
4648
}.void
4749

4850
def streamKeyValue(aggRepository: ConcurrentMap[K, Long]): fs2.Stream[IO, (K, Long)] =
@@ -61,4 +63,7 @@ object Aggregate {
6163
}.drain ++ streamKeyValue(aggRepository)
6264
}
6365
}
66+
67+
def aggregateByKey[I, K](feedContext: FeedContext[IO], key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
68+
aggregateByKeys(feedContext, key.andThen(List(_)), counter)
6469
}

src/main/scala/com/intenthq/action_processor/integrations/feeds/FeedContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ import com.intenthq.action_processor.integrations.config.MapDbSettings
66
import com.intenthq.embeddings.Mapping
77

88
case class FeedFilter(date: Option[LocalDate], time: Option[LocalTime])
9-
case class FeedContext[F[_]](embeddings: Option[Mapping[String, Array[Int], F]], filter: FeedFilter, mapDbSettings: MapDbSettings)
9+
case class FeedContext[F[_]](embeddings: Option[Mapping[String, List[String], F]], filter: FeedFilter, mapDbSettings: MapDbSettings)

src/test/scala/com/intenthq/action_processor/integrations/FeedAggregatedSpec.scala

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,85 @@ import java.nio.charset.StandardCharsets
44
import java.time.LocalDateTime
55

66
import cats.effect.IO
7+
import cats.implicits.catsSyntaxApplicativeId
8+
import fs2.Pipe
9+
710
import com.intenthq.action_processor.integrations.aggregations.Aggregate
811
import com.intenthq.action_processor.integrations.feeds.{Feed, FeedContext}
912
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
10-
import fs2.Pipe
13+
import com.intenthq.embeddings.Mapping
14+
1115
import weaver.SimpleIOSuite
1216

1317
object FeedAggregatedSpec extends SimpleIOSuite {
1418

1519
test("should return a stream of aggregated csv feed rows") {
1620
val aggregatedFeed = new PersonsAggregatedByScoreFeed(
17-
Person("Peter", LocalDateTime.parse("2001-01-01T00:00:00"), 5),
18-
Person("Gabriela", LocalDateTime.parse("2002-01-01T00:00:00"), 7),
19-
Person("Jolie", LocalDateTime.parse("2003-01-01T00:00:00"), 4),
20-
Person("Peter", LocalDateTime.parse("2001-01-01T00:00:00"), 6)
21+
Person("Peter", LocalDateTime.parse("2000-01-01T00:00:00"), "music.com", 3),
22+
Person("Peter", LocalDateTime.parse("2000-01-01T00:00:00"), "rap.com", 5),
23+
Person("Peter", LocalDateTime.parse("2000-01-01T00:00:00"), "rock.com", 7),
24+
Person("Gabriela", LocalDateTime.parse("2000-01-01T00:00:00"), "music.com", 2),
25+
Person("Gabriela", LocalDateTime.parse("2000-01-01T00:00:00"), "rap.com", 10),
26+
Person("Gabriela", LocalDateTime.parse("2000-01-01T00:00:00"), "unknown.com", 10),
27+
Person("Jolie", LocalDateTime.parse("2000-01-01T00:00:00"), "rap.com", 1),
28+
Person("Jolie", LocalDateTime.parse("2000-01-01T00:00:00"), "rock.com", 7)
2129
)
2230

31+
val mapping = new Mapping[String, List[String], IO] {
32+
private val map = Map(
33+
"music.com" -> List("Music"),
34+
"rap.com" -> List("Music", "Rap"),
35+
"rock.com" -> List("Music", "Rock")
36+
)
37+
override def get(key: String): IO[Option[List[String]]] = map.get(key).pure
38+
}
39+
2340
val expectedResult: Set[String] = Set(
24-
"Peter,2001-01-01T00:00:00,11",
25-
"Gabriela,2002-01-01T00:00:00,7",
26-
"Jolie,2003-01-01T00:00:00,4"
41+
"Peter,2000-01-01T00:00:00,Music,15",
42+
"Peter,2000-01-01T00:00:00,Rap,5",
43+
"Peter,2000-01-01T00:00:00,Rock,7",
44+
"Gabriela,2000-01-01T00:00:00,Music,12",
45+
"Gabriela,2000-01-01T00:00:00,Rap,10",
46+
"Jolie,2000-01-01T00:00:00,Music,8",
47+
"Jolie,2000-01-01T00:00:00,Rock,7",
48+
"Jolie,2000-01-01T00:00:00,Rap,1"
2749
).map(_ + '\n')
2850

2951
for {
30-
feedStreamLinesBytes <- aggregatedFeed.stream(TestDefaults.feedContext).compile.toList
52+
feedStreamLinesBytes <- aggregatedFeed.stream(TestDefaults.feedContext.copy(embeddings = Some(mapping))).compile.toList
3153
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
3254
} yield expect(feedStreamLines == expectedResult)
3355
}
3456
}
3557

36-
case class Person(name: String, bornDate: LocalDateTime, score: Int)
37-
case class AggregatedPerson(name: String, bornDate: LocalDateTime)
58+
case class Person(name: String, timestamp: LocalDateTime, domain: String, count: Int)
59+
case class MappedPerson(name: String, timestamp: LocalDateTime, interests: List[String], count: Int)
60+
case class AggregatedPerson(name: String, timestamp: LocalDateTime, interest: String)
61+
62+
class PersonsAggregatedByScoreFeed(persons: Person*) extends Feed[MappedPerson, AggregatedPerson] {
63+
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, MappedPerson] =
64+
fs2
65+
.Stream(persons: _*)
66+
.through(mapPersons(feedContext))
3867

39-
class PersonsAggregatedByScoreFeed(persons: Person*) extends Feed[Person, AggregatedPerson] {
40-
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, Person] = fs2.Stream(persons: _*).covary[IO]
68+
private def mapPersons(feedContext: FeedContext[IO]): Pipe[IO, Person, MappedPerson] = { in =>
69+
fs2.Stream.eval(IO.fromOption(feedContext.embeddings)(new RuntimeException("Mapping required"))).flatMap { mappings =>
70+
in.evalMap { person =>
71+
mappings
72+
.get(person.domain)
73+
.map(
74+
_.map(interests => MappedPerson(person.name, person.timestamp, interests, person.count))
75+
)
76+
}.unNone
77+
}
78+
}
4179

42-
override def transform(feedContext: FeedContext[IO]): Pipe[IO, Person, (AggregatedPerson, Long)] =
43-
Aggregate.aggregateByKey[Person, AggregatedPerson](feedContext, person => AggregatedPerson(person.name, person.bornDate), _.score.toLong)
80+
override def transform(feedContext: FeedContext[IO]): Pipe[IO, MappedPerson, (AggregatedPerson, Long)] =
81+
Aggregate.aggregateByKeys[MappedPerson, AggregatedPerson](
82+
feedContext,
83+
person => person.interests.map(AggregatedPerson(person.name, person.timestamp, _)),
84+
_.count.toLong
85+
)
4486

4587
override def serialize(o: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((o, counter))
4688
}

src/test/scala/com/intenthq/action_processor/integrations/SQLFeedSpec.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import java.time.{Instant, LocalDate, LocalTime}
55

66
import cats.effect.{Async, Blocker, Bracket, ContextShift, IO, Resource}
77
import cats.implicits.toFunctorOps
8+
89
import com.intenthq.action_processor.integrations.aggregations.NoAggregate
910
import com.intenthq.action_processor.integrations.feeds.{FeedContext, SQLFeed}
1011
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
12+
1113
import doobie.h2.H2Transactor
1214
import doobie.implicits.{toConnectionIOOps, toSqlInterpolator}
1315
import doobie.util.query.Query0
1416
import doobie.util.transactor.Transactor
1517
import doobie.util.update.Update
1618
import doobie.implicits.javatime._
1719
import weaver.IOSuite
18-
19-
import scala.concurrent.ExecutionContextExecutor
20+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
2021

2122
object SQLFeedSpec extends IOSuite with SQLFeedSpecResources {
2223

@@ -55,7 +56,7 @@ trait SQLFeedSpecResources { self: IOSuite =>
5556
for {
5657
blocker <- Blocker[IO]
5758
// Given a transactor for an exanpleRows fixture populated database
58-
transactor <- transactorResource[IO](blocker, ec)
59+
transactor <- transactorResource[IO](blocker, ExecutionContext.global)
5960
} yield transactor
6061

6162
private def transactorResource[F[_]: Async: ContextShift](blocker: Blocker, ec: ExecutionContextExecutor): Resource[F, H2Transactor[F]] = {

src/test/scala/com/intenthq/action_processor/integrations/serializations/csv/CsvSerializationSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,24 @@ object CsvSerializationSpec extends SimpleIOSuite {
2020
checkLine(Test("a"), "a")
2121
}
2222

23-
simpleTest("Serialize several fields case class") {
23+
pureTest("Serialize several fields case class") {
2424
case class Test(a: String, b: String, c: String)
2525
checkLine(Test("a", "b", "c"), "a,b,c")
2626
}
2727

28-
simpleTest("Serialize nested case class") {
28+
pureTest("Serialize nested case class") {
2929
case class Test1(a1: String, b1: Test2, c1: String)
3030
case class Test2(a2: String, b2: String, c2: String)
3131
checkLine(Test1("a1", Test2("a2", "b2", "c2"), "c1"), "a1,a2,b2,c2,c1")
3232
}
3333

34-
simpleTest("Serialize optional fields case class") {
34+
pureTest("Serialize optional fields case class") {
3535
case class Test(a: String, b: Option[String], c: String)
3636
checkLine(Test("a", Some("b"), "c"), "a,b,c") |+|
3737
checkLine(Test("a", None, "c"), "a,,c")
3838
}
3939

40-
simpleTest("Serialize primitive types") {
40+
pureTest("Serialize primitive types") {
4141
checkLine("a", "a") |+|
4242
checkLine(1, "1") |+|
4343
checkLine(1L, "1") |+|

0 commit comments

Comments
 (0)