Skip to content

Commit 8b19938

Browse files
committed
IC-40 Shows input stream progress
1 parent 6223ffa commit 8b19938

File tree

1 file changed

+5
-2
lines changed
  • src/main/scala/com/intenthq/action_processor/integrations/feeds

1 file changed

+5
-2
lines changed

src/main/scala/com/intenthq/action_processor/integrations/feeds/Feed.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import cats.effect.IO
66

77
trait Feed[I, O] {
88

9+
def toString(i: I): String = i.toString
10+
911
lazy val feedName: String = getClass.getSimpleName.stripSuffix("$")
1012
def date(feedContext: FeedContext[IO], clock: Clock = Clock.systemDefaultZone()): IO[LocalDate] =
1113
feedContext.filter.date.fold(IO.delay(java.time.LocalDate.now(clock)))(IO.pure)
@@ -14,9 +16,10 @@ trait Feed[I, O] {
1416
def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I]
1517
def transform(feedContext: FeedContext[IO]): fs2.Pipe[IO, I, (O, Long)]
1618
def serialize(o: O, counter: Long): Array[Byte]
19+
val serialization: fs2.Pipe[IO, (O, Long), Array[Byte]] = _.map((serialize _).tupled)
1720

18-
final def stream(feedContext: FeedContext[IO]): fs2.Stream[IO, Array[Byte]] =
21+
def stream(feedContext: FeedContext[IO]): fs2.Stream[IO, Array[Byte]] =
1922
inputStream(feedContext)
2023
.through(transform(feedContext))
21-
.map((serialize _).tupled)
24+
.through(serialization)
2225
}

0 commit comments

Comments
 (0)