Skip to content

Commit cb0d07d

Browse files
Merlin Rabensmerlinrabens
authored andcommitted
Replace SourceContext by FeedContext
1 parent 972ae83 commit cb0d07d

File tree

8 files changed

+35
-19
lines changed

8 files changed

+35
-19
lines changed

src/main/scala/com/intenthq/action_processor/integrationsV2/aggregations/Aggregate.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ object Aggregate {
1111
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
1212

1313
private def loadAggRepository[K]: Resource[IO, ConcurrentMap[K, Long]] =
14+
// TODO: Insert MapDB here ...
1415
Resource.liftF(IO.pure(new ConcurrentHashMap[K, Long]()))
1516

1617
def aggregateByKey[I, K](key: I => K, counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =

src/main/scala/com/intenthq/action_processor/integrationsV2/feeds/CSVFeed.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import java.io.StringReader
44

55
import cats.effect.{IO, Resource}
66
import cats.implicits.catsSyntaxApplicativeId
7-
import com.intenthq.action_processor.integrations.SourceContext
87
import de.siegmar.fastcsv.reader.CsvReader
98
import fs2.Stream
109

@@ -24,7 +23,6 @@ trait CSVFeed[O] extends Feed[Iterable[String], O] {
2423
.pure[IO]
2524
}
2625

27-
override def inputStream(sourceContext: SourceContext[IO]): Stream[IO, Iterable[String]] =
26+
override def inputStream(feedContext: FeedContext[IO]): Stream[IO, Iterable[String]] =
2827
rows.evalMap(csvParse)
29-
3028
}
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import cats.effect.IO
4-
import com.intenthq.action_processor.integrations.SourceContext
54

65
trait Feed[I, O] {
76

8-
def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, I]
7+
def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I]
98
def transform: fs2.Pipe[IO, I, (O, Long)]
109
def serialize(o: O, counter: Long): Array[Byte]
1110

12-
final def stream(sourceContext: SourceContext[IO]): fs2.Stream[IO, Array[Byte]] =
13-
inputStream(sourceContext)
11+
final def stream(feedContext: FeedContext[IO]): fs2.Stream[IO, Array[Byte]] =
12+
inputStream(feedContext)
1413
.through(transform)
1514
.map((serialize _).tupled)
1615
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.intenthq.action_processor.integrationsV2.feeds
2+
3+
import java.time.{LocalDate, LocalTime}
4+
5+
import com.intenthq.action_processor.integrationsV2.config.MapDBSettings
6+
import com.intenthq.embeddings.Mapping
7+
8+
case class FeedFilter(date: Option[LocalDate], time: Option[LocalTime])
9+
10+
object FeedFilter {
11+
val empty: FeedFilter = FeedFilter(None, None)
12+
}
13+
14+
case class FeedContext[F[_]](embeddings: Option[Mapping[String, Array[Int], F]], filter: FeedFilter, mapDbSettings: MapDBSettings)
15+
16+
object FeedContext {
17+
def empty[F[_]]: FeedContext[F] = FeedContext[F](None, FeedFilter.empty, MapDBSettings.Default)
18+
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.intenthq.action_processor.integrationsV2.feeds
22

33
import cats.effect.{ContextShift, IO}
4-
import com.intenthq.action_processor.integrations.{DoobieImplicits, SourceContext}
4+
import com.intenthq.action_processor.integrations.DoobieImplicits
55
import doobie.util.query.Query0
66
import doobie.util.transactor.Transactor
77
import doobie.util.transactor.Transactor.Aux
@@ -10,7 +10,7 @@ trait SQLFeed[I, O] extends Feed[I, O] with DoobieImplicits {
1010

1111
protected val driver: String
1212
protected val jdbcUrl: String
13-
protected def query(sourceContext: SourceContext[IO]): Query0[I]
13+
protected def query(sourceContext: FeedContext[IO]): Query0[I]
1414

1515
implicit private val contextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
1616

@@ -19,8 +19,8 @@ trait SQLFeed[I, O] extends Feed[I, O] with DoobieImplicits {
1919

2020
protected def createTransactor: Aux[IO, Unit] = Transactor.fromDriverManager[IO](driver, jdbcUrl)
2121

22-
override def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, I] =
23-
query(sourceContext)
22+
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I] =
23+
query(feedContext)
2424
.streamWithChunkSize(chunkSize)
2525
.transact[IO](transactor)
2626
}

src/test/scala/com/intenthq/action_processor/integrations/CSVFeedSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import cats.effect.{IO, Resource}
88
import cats.implicits._
99
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
1010
import com.intenthq.action_processor.integrationsV2.aggregations.NoAggregate
11-
import com.intenthq.action_processor.integrationsV2.feeds.LocalFileCSVFeed
11+
import com.intenthq.action_processor.integrationsV2.feeds.{FeedContext, LocalFileCSVFeed}
1212
import weaver.IOSuite
1313

1414
object CSVFeedSpec extends IOSuite with CSVFeedSpecResources {
@@ -29,7 +29,7 @@ object CSVFeedSpec extends IOSuite with CSVFeedSpecResources {
2929
).map(_ + '\n')
3030

3131
for {
32-
feedStreamLinesBytes <- resources.csvFeed.stream(SourceContext.empty).compile.toList
32+
feedStreamLinesBytes <- resources.csvFeed.stream(FeedContext.empty).compile.toList
3333
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
3434
} yield expect(feedStreamLines == expectedResult)
3535
}

src/test/scala/com/intenthq/action_processor/integrations/FeedAggregatedSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.nio.charset.StandardCharsets
55
import cats.effect.IO
66
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
77
import com.intenthq.action_processor.integrationsV2.aggregations.Aggregate
8-
import com.intenthq.action_processor.integrationsV2.feeds.Feed
8+
import com.intenthq.action_processor.integrationsV2.feeds.{Feed, FeedContext}
99
import fs2.Pipe
1010
import weaver.SimpleIOSuite
1111

@@ -26,7 +26,7 @@ object FeedAggregatedSpec extends SimpleIOSuite {
2626
).map(_ + '\n')
2727

2828
for {
29-
feedStreamLinesBytes <- aggregatedFeed.stream(SourceContext.empty).compile.toList
29+
feedStreamLinesBytes <- aggregatedFeed.stream(FeedContext.empty).compile.toList
3030
feedStreamLines = feedStreamLinesBytes.map(bytes => new String(bytes, StandardCharsets.UTF_8)).toSet
3131
} yield expect(feedStreamLines == expectedResult)
3232
}
@@ -36,7 +36,7 @@ case class Person(name: String, address: String, score: Int)
3636
case class AggregatedPerson(name: String, address: String)
3737

3838
class PersonsAggregatedByScoreFeed(persons: Person*) extends Feed[Person, AggregatedPerson] {
39-
override def inputStream(sourceContext: SourceContext[IO]): fs2.Stream[IO, Person] = fs2.Stream(persons: _*).covary[IO]
39+
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, Person] = fs2.Stream(persons: _*).covary[IO]
4040

4141
override def transform: Pipe[IO, Person, (AggregatedPerson, Long)] =
4242
Aggregate.aggregateByKey[Person, AggregatedPerson](person => AggregatedPerson(person.name, person.address), _.score.toLong)

src/test/scala/com/intenthq/action_processor/integrations/SQLFeedSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import cats.effect.{Async, Blocker, Bracket, ContextShift, IO, Resource}
77
import cats.implicits._
88
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
99
import com.intenthq.action_processor.integrationsV2.aggregations.NoAggregate
10-
import com.intenthq.action_processor.integrationsV2.feeds.SQLFeed
10+
import com.intenthq.action_processor.integrationsV2.feeds.{FeedContext, SQLFeed}
1111
import doobie.h2.H2Transactor
1212
import doobie.implicits._
1313
import doobie.implicits.javatime._
@@ -38,7 +38,7 @@ object SQLFeedSpec extends IOSuite with SQLFeedSpecResources {
3838

3939
test("should return a stream of parsed ExampleFeedRow") { _ =>
4040
for {
41-
feedStreamLinesBytes <- ExampleCsvFeed.stream(SourceContext.empty).compile.toList
41+
feedStreamLinesBytes <- ExampleCsvFeed.stream(FeedContext.empty).compile.toList
4242
feedStreamLines = feedStreamLinesBytes.map(new String(_))
4343
expectedOutput = exampleRows.map(CsvSerialization.serialize[ExampleCsvFeedRow]).map(new String(_))
4444
} yield expect(feedStreamLines == expectedOutput)
@@ -112,7 +112,7 @@ abstract class H2Source[I, O] extends SQLFeed[I, O] with TimeMeta with JavaTimeM
112112

113113
object ExampleCsvFeed extends H2Source[ExampleCsvFeedRow, ExampleCsvFeedRow] with NoAggregate[ExampleCsvFeedRow] {
114114

115-
override def query(context: SourceContext[IO]): Query0[ExampleCsvFeedRow] =
115+
override def query(context: FeedContext[IO]): Query0[ExampleCsvFeedRow] =
116116
(
117117
sql"""SELECT integer, bigint, float, double, decimal, numeric, bit, varchar, date, time, timestamp
118118
|FROM example""".stripMargin ++

0 commit comments

Comments
 (0)