Skip to content

Commit 26ff43a

Browse files
fernandomoramerlinrabens
authored andcommitted
Fixes CsvFeedSpec
1 parent a768675 commit 26ff43a

File tree

7 files changed

+11
-11
lines changed

7 files changed

+11
-11
lines changed

src/main/scala/com/intenthq/action_processor/integrationsV2/CsvFeed.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ abstract class CsvFeed[O] extends Feed[Iterable[String], O] {
1717

1818
private def csvParse(line: String): IO[Iterable[String]] =
1919
Resource.fromAutoCloseable(IO.delay(new StringReader(line))).use { sr =>
20-
Option(csvReader.parse(sr))
21-
.flatMap(parser => Option(parser.nextRow()).map(_.getFields.asScala))
20+
Option(csvReader.parse(sr).nextRow())
21+
.map(_.getFields.asScala)
2222
.getOrElse(collection.mutable.Buffer.empty[String])
2323
.pure[IO]
2424
}
2525

2626
override def inputStream: Stream[IO, Iterable[String]] =
2727
rows.evalMap(csvParse)
28+
2829
}

src/main/scala/com/intenthq/action_processor/integrationsV2/HiveFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import doobie.util.transactor.{Strategy, Transactor}
66

77
import scala.util.Properties
88

9-
abstract class HiveFeed[I, O] extends SqlFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
9+
abstract class HiveFeed[I, O] extends SQLFeed[I, O]("org.apache.hive.jdbc.HiveDriver") with TimeMeta with JavaLegacyTimeMeta {
1010
override protected lazy val transactor: Transactor[IO] = Transactor.strategy.set(createTransactor, Strategy.void)
1111
override protected val jdbcUrl: String = Properties.envOrElse("HIVE_JDBC_URL", "jdbc:hive2://localhost:10000")
1212
}

src/main/scala/com/intenthq/action_processor/integrationsV2/LocalFileCsvFeed.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ trait LocalFileCsvFeed[O] extends CsvFeed[O] {
1818
.readAll[IO](Paths.get(localFilePath), blocker, 4096)
1919
.through(text.utf8Decode)
2020
.through(text.lines)
21+
.dropLastIf(_.isEmpty)
2122
}
2223
}

src/main/scala/com/intenthq/action_processor/integrationsV2/SqlFeed.scala renamed to src/main/scala/com/intenthq/action_processor/integrationsV2/SQLFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import doobie.util.query.Query0
66
import doobie.util.transactor.Transactor
77
import doobie.util.transactor.Transactor.Aux
88

9-
abstract class SqlFeed[I, O](driver: String) extends Feed[I, O] {
9+
abstract class SQLFeed[I, O](driver: String) extends Feed[I, O] {
1010

1111
protected val jdbcUrl: String
1212
protected def query: Query0[I]

src/test/resources/persons.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"Peter"|"Big Street 1"|"5"
22
"Gabriela"|"Big Street 2"|"7"
33
"Jolie"|"Big Street 3"|"4"
4-
"Sam"|"Big Street 4"|"6"
4+
"Peter"|"Big Street 1"|"6"

src/test/scala/com/intenthq/action_processor/integrations/CsvFeedSpec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import weaver.SimpleIOSuite
55
object CsvFeedSpec extends SimpleIOSuite {
66

77
private val expectedResult: Set[String] = Set(
8-
"447722222222,20150620000000,apple.com,1",
9-
"447599999999,20150620000000,microsoft.co.uk,3"
8+
"Peter,Big Street 1,11",
9+
"Gabriela,Big Street 2,7",
10+
"Jolie,Big Street 3,4"
1011
).map(_ + '\n')
1112

1213
simpleTest("should return a stream of parsed csv feed rows") {

src/test/scala/com/intenthq/action_processor/integrations/ExampleLocalFileCsvFeed.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.nio.file.Paths
44

55
import cats.effect.IO
66
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
7-
import com.intenthq.action_processor.integrationsV2.{Aggregate, LocalFileCsvFeed, NoAggregate}
7+
import com.intenthq.action_processor.integrationsV2.{Aggregate, LocalFileCsvFeed}
88
import fs2.Pipe
99

1010
case class AggregatedPerson(name: String, address: String)
@@ -25,8 +25,5 @@ object ExampleLocalFileCsvFeed extends LocalFileCsvFeed[AggregatedPerson] {
2525
override def transform: Pipe[IO, Iterable[String], (AggregatedPerson, Long)] = Aggregate.aggregateByKey[Iterable[String], AggregatedPerson](key, counter)
2626

2727
override def serialize(a: AggregatedPerson, counter: Long): Array[Byte] = CsvSerialization.serialize((a, counter))
28-
}
2928

30-
object ExampleLocalFileCsvFeed2 extends LocalFileCsvFeed[Iterable[String]] with NoAggregate[Iterable[String]] {
31-
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.serialize(o)
3229
}

0 commit comments

Comments
 (0)