Skip to content

Commit 34dbbab

Browse files
joserayamerlinrabens
authored andcommitted
PFM-684 'Feed::part' returns a String instead of a number
This will give more flexibility to the feeds to generate the part names
1 parent de45952 commit 34dbbab

File tree

2 files changed

+75
-1
lines changed
  • src
    • main/scala/com/intenthq/action_processor/integrations/feeds
    • test/scala/com/intenthq/action_processor/integrations

2 files changed

+75
-1
lines changed

src/main/scala/com/intenthq/action_processor/integrations/feeds/Feed.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,19 @@ trait Feed[I, O] {
1111

1212
def date(feedContext: FeedContext[IO], clock: Clock = Clock.systemDefaultZone()): IO[LocalDate] =
1313
feedContext.filter.date.fold(IO.delay(java.time.LocalDate.now(clock)))(IO.pure)
14-
def part(feedContext: FeedContext[IO]): Int = feedContext.filter.time.fold(0)(_.getHour)
14+
15+
def part(feedContext: FeedContext[IO]): String = {
16+
val timePart = f"${feedContext.filter.time.fold(0)(_.getHour)}%02d"
17+
if (feedContext.filter.partition.nonEmpty) {
18+
val partitionPart = feedContext.filter.partition.toList
19+
.sortBy(_._1)
20+
.map(_._2)
21+
.mkString("_")
22+
s"${partitionPart}_$timePart"
23+
.replaceAll("\\W", "_")
24+
} else
25+
timePart
26+
}
1527

1628
def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I]
1729
def transform(feedContext: FeedContext[IO]): fs2.Pipe[IO, I, O]
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.intenthq.action_processor.integrations
2+
3+
import java.nio.charset.Charset
4+
import java.time.LocalDate
5+
import java.time.LocalTime
6+
7+
import scala.util.Random
8+
9+
import cats.effect.IO
10+
11+
import com.intenthq.action_processor.integrations.aggregations.NoAggregate
12+
import com.intenthq.action_processor.integrations.feeds.Feed
13+
import com.intenthq.action_processor.integrations.feeds.FeedContext
14+
import com.intenthq.action_processor.integrations.feeds.FeedFilter
15+
16+
import weaver.SimpleIOSuite
17+
18+
object FeedSpec extends SimpleIOSuite {
19+
pureTest("the part name is '00' by default") {
20+
val part = MinimalFeed.part(context(None, None, Map.empty))
21+
expect(part == "00")
22+
}
23+
24+
pureTest("the part name is the hour if there are no partitions") {
25+
val part = MinimalFeed.part(context(None, Some(LocalTime.parse("09:25")), Map.empty))
26+
expect(part == "09")
27+
}
28+
29+
pureTest("the part name contains the partition values if present (sorted by key)") {
30+
val part =
31+
MinimalFeed.part(context(None, Some(LocalTime.parse("09:25")), Map("month" -> "April", "city" -> "London")))
32+
33+
// London goes first because the order of keys is ('city', 'month')
34+
expect(part == "London_April_09")
35+
}
36+
37+
pureTest("the part name replaces invalid chars in the partition values") {
38+
val part =
39+
MinimalFeed.part(context(None, Some(LocalTime.parse("09:25")), Map("month" -> "*Ap?r'ilò")))
40+
expect(part == "_Ap_r_il__09")
41+
}
42+
43+
private def context(date: Option[LocalDate],
44+
time: Option[LocalTime],
45+
partition: Map[String, String]
46+
): FeedContext[IO] =
47+
TestDefaults
48+
.feedContext[IO]
49+
.copy(
50+
filter = FeedFilter(date, time, partition)
51+
)
52+
}
53+
54+
object MinimalFeed extends Feed[String, String] with NoAggregate[String] {
55+
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, String] =
56+
fs2.Stream
57+
.eval(IO(Random.alphanumeric.take(10).mkString))
58+
.repeat
59+
.take(20)
60+
61+
override def serialize(o: String): Array[Byte] = o.getBytes(Charset.defaultCharset())
62+
}

0 commit comments

Comments
 (0)