Skip to content

Commit b2ddd9b

Browse files
committed
IC-445 Adds fields encryption
1 parent 9cdd5ac commit b2ddd9b

File tree

19 files changed

+292
-71
lines changed

19 files changed

+292
-71
lines changed

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version = 2.6.4
22
align.openParenCallSite = true
33
align.openParenDefnSite = true
4-
maxColumn = 160
4+
maxColumn = 120
55
continuationIndent.defnSite = 2
66
assumeStandardLibraryStripMargin = true
77
danglingParentheses.preset = true

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ lazy val root = (project in file("."))
1919
Compile / packageSrc / mappings := Seq(),
2020
testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
2121
scalafmtOnCompile := true,
22+
Test / scalacOptions --= Seq("-Xfatal-warnings"),
2223
libraryDependencies ++= Seq(
2324
"co.fs2" %% "fs2-core" % "3.2.2",
2425
"co.fs2" %% "fs2-io" % "3.2.2",
26+
"com.google.crypto.tink" % "tink" % "1.6.1",
2527
"com.softwaremill.magnolia1_2" %% "magnolia" % "1.0.0-M7",
2628
"de.siegmar" % "fastcsv" % "1.0.3",
2729
"org.mapdb" % "mapdb" % "3.0.8",

src/main/scala/com/intenthq/StreamOps.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import scala.concurrent.duration.FiniteDuration
88
import StreamOps._
99

1010
class StreamChunkOps[F[_], O](s: fs2.Stream[F, Chunk[O]]) {
11-
private val parallelismEnabled: Boolean = scala.util.Properties.envOrNone("FS2_PARALLELISM").map(_.toBoolean).getOrElse(true)
11+
private val parallelismEnabled: Boolean =
12+
scala.util.Properties.envOrNone("FS2_PARALLELISM").map(_.toBoolean).getOrElse(true)
1213

13-
def parEvalMapChunksUnordered[F2[x] >: F[x]: Concurrent, O2](parallelism: Int)(f: Chunk[O] => F2[Chunk[O2]]): Stream[F2, O2] =
14+
def parEvalMapChunksUnordered[F2[x] >: F[x]: Concurrent, O2](
15+
parallelism: Int
16+
)(f: Chunk[O] => F2[Chunk[O2]]): Stream[F2, O2] =
1417
if (!parallelismEnabled)
1518
s.flatMap(chunk => Stream.evalUnChunk(f(chunk)))
1619
else

src/main/scala/com/intenthq/action_processor/integrations/aggregations/Aggregate.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import scala.jdk.CollectionConverters._
1313

1414
object Aggregate {
1515

16-
def noop[I]: fs2.Pipe[IO, I, (I, Long)] = _.map(_ -> 1L)
16+
def noop[I]: fs2.Pipe[IO, I, I] = identity
1717

1818
private def loadAggRepository[K](mapDbSettings: MapDbSettings): Resource[IO, HTreeMap[K, Long]] = {
1919

@@ -32,28 +32,41 @@ object Aggregate {
3232
)
3333
}
3434

35-
def aggregateByKeys[I, K](feedContext: FeedContext[IO], keys: I => List[K], counter: I => Long): fs2.Pipe[IO, I, (K, Long)] =
35+
def aggregateByKeys[I, K](feedContext: FeedContext[IO],
36+
keys: I => List[K],
37+
counter: I => Long
38+
): fs2.Pipe[IO, I, (K, Long)] =
3639
sourceStream => {
3740

3841
// This pipe aggregates all the elemens and returns a single Map as an aggregate repository
3942
val aggregateInRepository: fs2.Pipe[IO, I, ConcurrentMap[K, Long]] =
4043
in => {
4144
fs2.Stream
42-
.resource[IO, ConcurrentMap[K, Long]](loadAggRepository(feedContext.mapDbSettings))
45+
.resource[IO, ConcurrentMap[K, Long]](
46+
loadAggRepository(feedContext.mapDbSettings)
47+
)
4348
.flatMap { aggRepository =>
4449
fs2.Stream.exec(IO.delay(println("Starting aggregation"))) ++
4550
in.evalMapChunk { o =>
4651
IO.delay {
4752
keys(o).foreach { value =>
48-
val previousCounter = aggRepository.getOrDefault(value, 0L)
53+
val previousCounter =
54+
aggRepository.getOrDefault(value, 0L)
4955
aggRepository.put(value, counter(o) + previousCounter)
5056
}
5157
aggRepository
5258
}
5359
}
5460
// Returns last aggRepository with the counter of elements
55-
.fold((aggRepository, 0L)) { case ((_, previousRows), aggRepository) => (aggRepository, previousRows + 1) }
56-
.evalMapChunk { case (aggRepository, n) => IO.delay(println(s"Finished aggregation of $n rows")).as(aggRepository) }
61+
.fold((aggRepository, 0L)) {
62+
case ((_, previousRows), aggRepository) =>
63+
(aggRepository, previousRows + 1)
64+
}
65+
.evalMapChunk {
66+
case (aggRepository, n) =>
67+
IO.delay(println(s"Finished aggregation of $n rows"))
68+
.as(aggRepository)
69+
}
5770
}
5871
}
5972

src/main/scala/com/intenthq/action_processor/integrations/aggregations/NoAggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ import scala.annotation.unused
77

88
trait NoAggregate[I] {
99
self: Feed[I, I] =>
10-
override def transform(@unused feedContext: FeedContext[IO]): fs2.Pipe[IO, I, (I, Long)] = Aggregate.noop
10+
override def transform(@unused feedContext: FeedContext[IO]): fs2.Pipe[IO, I, I] = Aggregate.noop
1111
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.intenthq.action_processor.integrations.encryption
2+
3+
import cats.syntax.all._
4+
import cats.{ApplicativeError, MonadError}
5+
import com.google.crypto.tink.subtle.AesSiv
6+
import com.intenthq.action_processor.integrations.encryption.ByteArrayStringOps._
7+
import com.intenthq.action_processor.integrations.feeds.Feed
8+
9+
import java.nio.charset.StandardCharsets.UTF_8
10+
import java.util.Base64
11+
12+
case class EncryptionKey private (bytes: Array[Byte]) {
13+
private val aesSiv: AesSiv = new AesSiv(bytes)
14+
def encrypt[F[_]: ApplicativeError[*[_], Throwable]](string: String): F[String] =
15+
ApplicativeError[F, Throwable].catchNonFatal(
16+
aesSiv.encryptDeterministically(string.getBytes(UTF_8), Array.emptyByteArray).asBase64StringFromBytes
17+
)
18+
def decrypt[F[_]: MonadError[*[_], Throwable]](base64String: String): F[String] =
19+
for {
20+
bytes <- base64String.asBytesFromBase64String[F]
21+
decrypted <-
22+
ApplicativeError[F, Throwable].catchNonFatal(aesSiv.decryptDeterministically(bytes, Array.emptyByteArray))
23+
} yield new String(decrypted, UTF_8)
24+
}
25+
26+
object EncryptionKey {
27+
def fromBytes(bytes: Array[Byte]): Either[Throwable, EncryptionKey] =
28+
Either.catchNonFatal(EncryptionKey(bytes))
29+
def fromBase64String[F[_]: MonadError[*[_], Throwable]](base64String: String): F[EncryptionKey] =
30+
for {
31+
bytes <- base64String.asBytesFromBase64String[F]
32+
encryptionKey <- ApplicativeError[F, Throwable].catchNonFatal(EncryptionKey(bytes))
33+
} yield encryptionKey
34+
}
35+
36+
object ByteArrayStringOps {
37+
implicit class StringSyntax(val string: String) extends AnyVal {
38+
def asBytesFromBase64String[F[_]: ApplicativeError[*[_], Throwable]]: F[Array[Byte]] =
39+
ApplicativeError[F, Throwable].catchNonFatal(Base64.getDecoder.decode(string))
40+
}
41+
42+
implicit class ByteArraySyntax(val bytes: Array[Byte]) extends AnyVal {
43+
def asBase64StringFromBytes: String = Base64.getEncoder.withoutPadding().encodeToString(bytes)
44+
}
45+
}
46+
47+
class EncryptionKeyNotFound(feedName: String)
48+
extends Exception(s"Encryption key not provided. Required in feed $feedName")
49+
50+
trait EncryptionForFeed[I, O] { self: Feed[I, O] =>
51+
def encryptedField[F[_]: ApplicativeError[*[_], Throwable], T](
52+
maybeEncryptionKey: Option[EncryptionKey]
53+
)(fieldToEncrypt: I => String)(setEncryptedField: (I, String) => O): fs2.Pipe[F, I, O] =
54+
maybeEncryptionKey match {
55+
case None => _ => fs2.Stream.raiseError[F](new EncryptionKeyNotFound(feedName))
56+
case Some(encryptionKey) =>
57+
_.evalMap(input =>
58+
encryptionKey.encrypt(fieldToEncrypt(input)).map(encrypted => setEncryptedField(input, encrypted))
59+
)
60+
}
61+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.intenthq.action_processor.integrations.encryption
2+
3+
object EncryptionKeyGeneratorApp extends App {
4+
private val KEY_SIZE_IN_BYTES = 64
5+
private val random = {
6+
val random = new java.security.SecureRandom()
7+
random.nextLong() // force seeding
8+
random
9+
}
10+
11+
def newRandomKey(): Array[Byte] = {
12+
val bytes = Array.ofDim[Byte](KEY_SIZE_IN_BYTES)
13+
random.nextBytes(bytes)
14+
bytes
15+
}
16+
17+
println(java.util.Base64.getEncoder.withoutPadding().encodeToString(newRandomKey()))
18+
}

src/main/scala/com/intenthq/action_processor/integrations/feeds/CSVFeed.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import fs2.Stream
99

1010
import scala.jdk.CollectionConverters._
1111

12-
trait ParseCSVInput[O] extends Feed[Iterable[String], O] {
12+
trait CSVFeed[O] extends Feed[Iterable[String], O] {
1313

1414
protected lazy val csvReader: CsvReader = new CsvReader
1515

src/main/scala/com/intenthq/action_processor/integrations/feeds/Feed.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ import cats.effect.IO
77
trait Feed[I, O] {
88

99
def toString(i: I): String = i.toString
10-
1110
lazy val feedName: String = getClass.getSimpleName.stripSuffix("$")
11+
1212
def date(feedContext: FeedContext[IO], clock: Clock = Clock.systemDefaultZone()): IO[LocalDate] =
1313
feedContext.filter.date.fold(IO.delay(java.time.LocalDate.now(clock)))(IO.pure)
1414
def part(feedContext: FeedContext[IO]): Int = feedContext.filter.time.fold(0)(_.getHour)
1515

1616
def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, I]
17-
def transform(feedContext: FeedContext[IO]): fs2.Pipe[IO, I, (O, Long)]
18-
def serialize(o: O, counter: Long): Array[Byte]
19-
val serialization: fs2.Pipe[IO, (O, Long), Array[Byte]] = _.map((serialize _).tupled)
17+
def transform(feedContext: FeedContext[IO]): fs2.Pipe[IO, I, O]
18+
def serialize(o: O): Array[Byte]
19+
val serialization: fs2.Pipe[IO, O, Array[Byte]] = _.map(serialize)
2020

2121
def stream(feedContext: FeedContext[IO]): fs2.Stream[IO, Array[Byte]] =
2222
inputStream(feedContext)
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package com.intenthq.action_processor.integrations.feeds
22

3-
import java.time.{LocalDate, LocalTime}
4-
53
import com.intenthq.action_processor.integrations.config.MapDbSettings
4+
import com.intenthq.action_processor.integrations.encryption.EncryptionKey
65
import com.intenthq.embeddings.Mapping
76

7+
import java.time.{LocalDate, LocalTime}
8+
89
case class FeedFilter(date: Option[LocalDate], time: Option[LocalTime])
9-
case class FeedContext[F[_]](embeddings: Option[Mapping[String, List[String], F]], filter: FeedFilter, mapDbSettings: MapDbSettings)
10+
object FeedFilter {
11+
val empty: FeedFilter = FeedFilter(None, None)
12+
}
13+
case class FeedContext[F[_]](embeddings: Option[Mapping[String, List[String], F]],
14+
filter: FeedFilter,
15+
mapDbSettings: MapDbSettings,
16+
encryptionKey: Option[EncryptionKey]
17+
)

src/main/scala/com/intenthq/action_processor/integrations/feeds/PostgresFeed.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ import scala.util.Properties
66

77
trait PostgresFeed[I, O] extends SQLFeed[I, O] with PgDoobieImplicits {
88
override val driver: String = "org.postgresql.Driver"
9-
override protected val jdbcUrl: String = Properties.envOrElse("POSTGRESQL_JDBC_URL", "jdbc:postgresql://localhost:5432/database")
9+
override protected val jdbcUrl: String =
10+
Properties.envOrElse("POSTGRESQL_JDBC_URL", "jdbc:postgresql://localhost:5432/database")
1011
}

src/main/scala/com/intenthq/action_processor/integrations/repositories/MapDBRepository.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ object MapDBRepository {
1313
def load(mapDBSettings: MapDbSettings): Resource[IO, DB] = {
1414
val dbInitOp = for {
1515
now <- IO.delay(LocalDateTime.now())
16-
dbFile <- IO.delay(new File(Paths.get(mapDBSettings.dbPath.toString, s"dbb-${now.toLocalDate}-${now.toLocalTime}").toUri))
16+
dbFile <-
17+
IO.delay(new File(Paths.get(mapDBSettings.dbPath.toString, s"dbb-${now.toLocalDate}-${now.toLocalTime}").toUri))
1718
createDb <- IO.blocking {
1819
DBMaker
1920
.fileDB(dbFile.getAbsolutePath)
@@ -26,6 +27,8 @@ object MapDBRepository {
2627
.make()
2728
}
2829
} yield (createDb, dbFile)
29-
Resource.make(dbInitOp)(db => IO.delay(db._1.close()).guarantee(IO.delay(Files.deleteIfExists(db._2.toPath)).void)).map(_._1)
30+
Resource
31+
.make(dbInitOp)(db => IO.delay(db._1.close()).guarantee(IO.delay(Files.deleteIfExists(db._2.toPath)).void))
32+
.map(_._1)
3033
}
3134
}

src/main/scala/com/intenthq/action_processor/integrations/serializations/csv/Csv.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ object Csv {
1313

1414
type Typeclass[A] = Csv[A]
1515

16-
def join[A](ctx: CaseClass[Csv, A]): Csv[A] = (a: A) => ctx.parameters.flatMap(p => p.typeclass.toCSV(p.dereference(a)))
16+
def join[A](ctx: CaseClass[Csv, A]): Csv[A] =
17+
(a: A) => ctx.parameters.flatMap(p => p.typeclass.toCSV(p.dereference(a)))
1718

1819
def split[A](ctx: SealedTrait[Csv, A]): Csv[A] = (a: A) => ctx.split(a)(sub => sub.typeclass.toCSV(sub.cast(a)))
1920

@@ -26,10 +27,17 @@ object Csv {
2627
implicit val csvDouble: Csv[Double] = (a: Double) => Seq(a.toString)
2728
implicit val csvBigDecimal: Csv[BigDecimal] = (a: BigDecimal) => Seq(a.toString)
2829
implicit val csvBoolean: Csv[Boolean] = (a: Boolean) => Seq(a.toString)
29-
implicit val csvLocalDate: Csv[LocalDate] = (a: LocalDate) => Seq(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd").format(a))
30-
implicit val csvLocalTime: Csv[LocalTime] = (a: LocalTime) => Seq(java.time.format.DateTimeFormatter.ofPattern("HH:mm:ss").format(a))
30+
implicit val csvLocalDate: Csv[LocalDate] = (a: LocalDate) =>
31+
Seq(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd").format(a))
32+
implicit val csvLocalTime: Csv[LocalTime] = (a: LocalTime) =>
33+
Seq(java.time.format.DateTimeFormatter.ofPattern("HH:mm:ss").format(a))
3134
implicit val csvInstant: Csv[Instant] = (a: Instant) => Seq(a.toString)
32-
implicit val csvLocalDateTime: Csv[LocalDateTime] = (a: LocalDateTime) => Seq(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").format(a))
35+
implicit val csvLocalDateTime: Csv[LocalDateTime] = (a: LocalDateTime) =>
36+
Seq(
37+
java.time.format.DateTimeFormatter
38+
.ofPattern("yyyy-MM-dd'T'HH:mm:ss")
39+
.format(a)
40+
)
3341

3442
implicit def deriveCsv[A]: Csv[A] = macro Magnolia.gen[A]
3543
}

src/test/scala/com/intenthq/action_processor/integrations/CSVFeedSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ trait CSVFeedSpecResources { self: IOSuite =>
6262
}
6363
}
6464

65-
class ExampleLocalFileSource(override val localFilePath: String) extends LocalFileFeed[Iterable[String], Iterable[String]] with NoAggregate[Iterable[String]] {
65+
class ExampleLocalFileSource(override val localFilePath: String)
66+
extends LocalFileFeed[Iterable[String], Iterable[String]]
67+
with NoAggregate[Iterable[String]] {
6668

6769
protected val parseInput: Pipe[IO, Byte, Iterable[String]] = ParseCSVInput.parseInput[IO]('|')
6870

69-
override def serialize(o: Iterable[String], counter: Long): Array[Byte] = CsvSerialization.columnsAsCsv(o)
71+
override def serialize(o: Iterable[String]): Array[Byte] = CsvSerialization.columnsAsCsv(o)
7072
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.intenthq.action_processor.integrations
2+
3+
import cats.effect.IO
4+
import cats.syntax.all._
5+
import com.intenthq.action_processor.integrations.config.MapDbSettings
6+
import com.intenthq.action_processor.integrations.encryption.{EncryptionForFeed, EncryptionKey}
7+
import com.intenthq.action_processor.integrations.feeds.{Feed, FeedContext, FeedFilter}
8+
import com.intenthq.action_processor.integrations.serializations.csv.Csv._
9+
import com.intenthq.action_processor.integrations.serializations.csv.CsvSerialization
10+
import fs2.Pipe
11+
import weaver.SimpleIOSuite
12+
13+
import java.nio.charset.StandardCharsets.UTF_8
14+
15+
object EncryptionSpec extends SimpleIOSuite {
16+
17+
case class User(userId: String, age: Int)
18+
case class EncryptedUser(encryptedUserId: String, age: Int)
19+
20+
class EncryptedFeed(users: User*) extends Feed[User, User] with EncryptionForFeed[User, User] {
21+
override def inputStream(feedContext: FeedContext[IO]): fs2.Stream[IO, User] = fs2.Stream.emits(users)
22+
override def transform(feedContext: FeedContext[IO]): Pipe[IO, User, User] =
23+
encryptedField(feedContext.encryptionKey)(user => user.userId)((user, encrypted) => user.copy(userId = encrypted))
24+
override def serialize(o: User): Array[Byte] = CsvSerialization.serialize(o)
25+
}
26+
27+
test("A feed with a given encryption key uses it to encrypt a given column") {
28+
for {
29+
encryptionKey <-
30+
EncryptionKey
31+
.fromBase64String(
32+
"L8zhPwpQ47adVtfd6Ool8Xr3Yhl4EGXqd3pV86dVu6AXT4OOZrjz72h1B/Bx5BlHkSC5LajTmFayPPv5dtRDdA"
33+
)
34+
feedContext = FeedContext[IO](embeddings = None,
35+
filter = FeedFilter.empty,
36+
mapDbSettings = MapDbSettings.Default,
37+
encryptionKey = Some(encryptionKey)
38+
)
39+
sourceUsers = List(User("userid-1", 1), User("userid-2", 2), User("userid-3", 3))
40+
feed = new EncryptedFeed(sourceUsers: _*)
41+
encryptedUsers = List(User("9/krxGDVzUhCibNRrE8XqA+HoLZo1NLk", 1),
42+
User("2pOZaBpNhBjlHIR9RGkoye9vWsUGN7LA", 2),
43+
User("B7gpZU+FFISFCAY0EBL5J/kId6Mh602c", 3)
44+
)
45+
expectedEncryptedUser = encryptedUsers.map(user => new String(CsvSerialization.serialize(user), UTF_8))
46+
encryptedLines <- feed.stream(feedContext).map(new String(_, UTF_8)).compile.toList
47+
decryptedUsers <- encryptedUsers.traverse(user =>
48+
encryptionKey.decrypt(user.userId).map(decrypted => user.copy(userId = decrypted))
49+
)
50+
} yield expect(encryptedLines == expectedEncryptedUser) and expect(decryptedUsers == sourceUsers)
51+
}
52+
}

0 commit comments

Comments
 (0)