From 80120ef59ed7bb7606a33b7136344363989784bb Mon Sep 17 00:00:00 2001 From: lchen-2101 <73617864+lchen-2101@users.noreply.github.com> Date: Mon, 6 Dec 2021 18:54:15 -0500 Subject: [PATCH] dead letter serialization fix --- common/src/main/protobuf/raw.data.events.proto | 5 +++++ common/src/main/resources/serialization.conf | 1 + .../submission/HmdaRawDataCommands.scala | 2 +- .../messages/submission/HmdaRawDataEvents.scala | 1 + .../processing/state/HmdaRawDataState.scala | 4 +++- .../HmdaRawDataEventsProtobufConverter.scala | 16 ++++++++++++++-- .../submission/HmdaRawDataEventsSerializer.scala | 10 +++++++--- .../http/filing/submissions/UploadHttpApi.scala | 9 ++++----- .../persistence/submission/HmdaRawData.scala | 5 +++-- .../persistence/submission/HmdaRawDataSpec.scala | 8 +++----- .../submission/HmdaValidationErrorSpec.scala | 4 ++-- 11 files changed, 44 insertions(+), 21 deletions(-) diff --git a/common/src/main/protobuf/raw.data.events.proto b/common/src/main/protobuf/raw.data.events.proto index cdb007ef12..14277269b6 100644 --- a/common/src/main/protobuf/raw.data.events.proto +++ b/common/src/main/protobuf/raw.data.events.proto @@ -8,6 +8,11 @@ message LineAddedMessage { string data = 2; } +message LinesAddedMessage { + int64 timestamp = 1; + repeated string data = 2; +} + message HmdaRawDataStateMessage { int32 size = 1; } \ No newline at end of file diff --git a/common/src/main/resources/serialization.conf b/common/src/main/resources/serialization.conf index d3699da560..f052300543 100644 --- a/common/src/main/resources/serialization.conf +++ b/common/src/main/resources/serialization.conf @@ -135,6 +135,7 @@ akka { "hmda.messages.submission.HmdaRawDataCommands$AddLines" = hmda-raw-commands "hmda.messages.submission.HmdaRawDataEvents$LineAdded" = hmda-raw-events + "hmda.messages.submission.HmdaRawDataEvents$LinesAdded" = hmda-raw-events "hmda.model.processing.state.HmdaRawDataState" = hmda-raw-events } diff --git a/common/src/main/scala/hmda/messages/submission/HmdaRawDataCommands.scala b/common/src/main/scala/hmda/messages/submission/HmdaRawDataCommands.scala index 0480ee5017..29fee14588 100644 --- a/common/src/main/scala/hmda/messages/submission/HmdaRawDataCommands.scala +++ b/common/src/main/scala/hmda/messages/submission/HmdaRawDataCommands.scala @@ -9,7 +9,7 @@ object HmdaRawDataCommands { sealed trait HmdaRawDataCommand extends Command - case class AddLines(submissionId: SubmissionId, timestamp: Long, data: Seq[String], maybeReplyTo: Option[ActorRef[Seq[HmdaRawDataEvent]]]) + case class AddLines(submissionId: SubmissionId, timestamp: Long, data: Seq[String], maybeReplyTo: Option[ActorRef[HmdaRawDataEvent]]) extends HmdaRawDataCommand case object StopRawData extends HmdaRawDataCommand diff --git a/common/src/main/scala/hmda/messages/submission/HmdaRawDataEvents.scala b/common/src/main/scala/hmda/messages/submission/HmdaRawDataEvents.scala index c952b34364..8859db4d15 100644 --- a/common/src/main/scala/hmda/messages/submission/HmdaRawDataEvents.scala +++ b/common/src/main/scala/hmda/messages/submission/HmdaRawDataEvents.scala @@ -5,4 +5,5 @@ import hmda.messages.CommonMessages.Event object HmdaRawDataEvents { sealed trait HmdaRawDataEvent extends Event case class LineAdded(timestamp: Long, data: String) extends HmdaRawDataEvent + case class LinesAdded(timestamp: Long, data: Seq[String]) extends HmdaRawDataEvent } diff --git a/common/src/main/scala/hmda/model/processing/state/HmdaRawDataState.scala b/common/src/main/scala/hmda/model/processing/state/HmdaRawDataState.scala index ef98a78117..903aa729c2 100644 --- a/common/src/main/scala/hmda/model/processing/state/HmdaRawDataState.scala +++ b/common/src/main/scala/hmda/model/processing/state/HmdaRawDataState.scala @@ -1,11 +1,13 @@ package hmda.model.processing.state -import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded } +import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded } case class HmdaRawDataState(size: Int = 0) { def update(event: HmdaRawDataEvent): HmdaRawDataState = event match { case LineAdded(_, _) => HmdaRawDataState(size + 1) + case LinesAdded(_, data) => + HmdaRawDataState(size + data.size) case _ => this } } diff --git a/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsProtobufConverter.scala b/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsProtobufConverter.scala index 038879b557..997e7621bc 100644 --- a/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsProtobufConverter.scala +++ b/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsProtobufConverter.scala @@ -1,8 +1,8 @@ package hmda.serialization.submission -import hmda.messages.submission.HmdaRawDataEvents.LineAdded +import hmda.messages.submission.HmdaRawDataEvents.{ LineAdded, LinesAdded } import hmda.model.processing.state.HmdaRawDataState -import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage } +import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage, LinesAddedMessage } object HmdaRawDataEventsProtobufConverter { @@ -12,12 +12,24 @@ object HmdaRawDataEventsProtobufConverter { evt.data ) + def linesAddedToProtobuf(evt: LinesAdded): LinesAddedMessage = + LinesAddedMessage( + evt.timestamp, + evt.data + ) + def lineAddedFromProtobuf(msg: LineAddedMessage): LineAdded = LineAdded( msg.timestamp, msg.data ) + def linesAddedFromProtobuf(msg: LinesAddedMessage): LinesAdded = + LinesAdded( + msg.timestamp, + msg.data + ) + def rawDataStateToProtobuf(evt: HmdaRawDataState): HmdaRawDataStateMessage = HmdaRawDataStateMessage( evt.size diff --git a/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsSerializer.scala b/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsSerializer.scala index 9af463e151..eb8d5d978c 100644 --- a/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsSerializer.scala +++ b/common/src/main/scala/hmda/serialization/submission/HmdaRawDataEventsSerializer.scala @@ -1,17 +1,17 @@ package hmda.serialization.submission import java.io.NotSerializableException - import akka.serialization.SerializerWithStringManifest -import hmda.messages.submission.HmdaRawDataEvents.LineAdded +import hmda.messages.submission.HmdaRawDataEvents.{ LineAdded, LinesAdded } import hmda.model.processing.state.HmdaRawDataState -import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage } +import hmda.persistence.serialization.raw.data.events.{ HmdaRawDataStateMessage, LineAddedMessage, LinesAddedMessage } import hmda.serialization.submission.HmdaRawDataEventsProtobufConverter._ class HmdaRawDataEventsSerializer extends SerializerWithStringManifest { override def identifier: Int = 114 final val LineAddedManifest = classOf[LineAdded].getName + final val LinesAddedManifest = classOf[LinesAdded].getName final val HmdaRawDataStateManifest = classOf[HmdaRawDataState].getName override def manifest(o: AnyRef): String = o.getClass.getName @@ -19,6 +19,8 @@ class HmdaRawDataEventsSerializer extends SerializerWithStringManifest { override def toBinary(o: AnyRef): Array[Byte] = o match { case evt: LineAdded => lineAddedToProtobuf(evt).toByteArray + case evt: LinesAdded => + linesAddedToProtobuf(evt).toByteArray case evt: HmdaRawDataState => rawDataStateToProtobuf(evt).toByteArray case _ => @@ -29,6 +31,8 @@ class HmdaRawDataEventsSerializer extends SerializerWithStringManifest { manifest match { case LineAddedManifest => lineAddedFromProtobuf(LineAddedMessage.parseFrom(bytes)) + case LinesAddedManifest => + linesAddedFromProtobuf(LinesAddedMessage.parseFrom(bytes)) case HmdaRawDataStateManifest => rawDataStateFromProtobuf(HmdaRawDataStateMessage.parseFrom(bytes)) case _ => diff --git a/hmda/src/main/scala/hmda/api/http/filing/submissions/UploadHttpApi.scala b/hmda/src/main/scala/hmda/api/http/filing/submissions/UploadHttpApi.scala index 5124d1c8e6..91510d647a 100644 --- a/hmda/src/main/scala/hmda/api/http/filing/submissions/UploadHttpApi.scala +++ b/hmda/src/main/scala/hmda/api/http/filing/submissions/UploadHttpApi.scala @@ -1,9 +1,8 @@ package hmda.api.http.filing.submissions import java.time.Instant - import akka.NotUsed -import akka.actor.typed.ActorSystem +import akka.actor.typed.{ ActorRef, ActorSystem } import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, EntityRef } import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{ StatusCodes, Uri } @@ -168,13 +167,13 @@ private class UploadHttpApi(log: Logger, sharding: ClusterSharding)( } } - private def uploadFile(submissionId: SubmissionId, hmdaRaw: EntityRef[HmdaRawDataCommand]): Flow[String, Seq[HmdaRawDataEvent], NotUsed] = + private def uploadFile(submissionId: SubmissionId, hmdaRaw: EntityRef[HmdaRawDataCommand]): Flow[String, HmdaRawDataEvent, NotUsed] = Flow[String] .grouped(100) .mapAsync(1)(lines => persistLines(hmdaRaw, submissionId, lines)) - private def persistLines(entityRef: EntityRef[HmdaRawDataCommand], submissionId: SubmissionId, data: Seq[String]): Future[Seq[HmdaRawDataEvent]] = { - val response: Future[Seq[HmdaRawDataEvent]] = entityRef ? (ref => AddLines(submissionId, Instant.now.toEpochMilli, data, Some(ref))) + private def persistLines(entityRef: EntityRef[HmdaRawDataCommand], submissionId: SubmissionId, data: Seq[String]): Future[HmdaRawDataEvent] = { + val response: Future[HmdaRawDataEvent] = entityRef ? ((ref: ActorRef[HmdaRawDataEvent]) => AddLines(submissionId, Instant.now.toEpochMilli, data, Some(ref))) response } } \ No newline at end of file diff --git a/hmda/src/main/scala/hmda/persistence/submission/HmdaRawData.scala b/hmda/src/main/scala/hmda/persistence/submission/HmdaRawData.scala index 1613de3a1a..60c485b22f 100644 --- a/hmda/src/main/scala/hmda/persistence/submission/HmdaRawData.scala +++ b/hmda/src/main/scala/hmda/persistence/submission/HmdaRawData.scala @@ -8,7 +8,7 @@ import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, RetentionCriteria } import hmda.messages.submission.HmdaRawDataCommands.{ AddLines, HmdaRawDataCommand, StopRawData } -import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded } +import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded } import hmda.model.filing.submission.SubmissionId import hmda.model.processing.state.HmdaRawDataState import hmda.persistence.HmdaTypedPersistentActor @@ -38,7 +38,7 @@ object HmdaRawData extends HmdaTypedPersistentActor[HmdaRawDataCommand, HmdaRawD log.debug(s"Persisted: $data") maybeReplyTo match { case Some(replyTo) => - replyTo ! evts + replyTo ! LinesAdded(timestamp, data) case None => //Do Nothing } } @@ -50,6 +50,7 @@ object HmdaRawData extends HmdaTypedPersistentActor[HmdaRawDataCommand, HmdaRawD override def eventHandler: (HmdaRawDataState, HmdaRawDataEvent) => HmdaRawDataState = { case (state, evt @ LineAdded(_, _)) => state.update(evt) + case (state, evt @ LinesAdded(_, _)) => state.update(evt) } def startShardRegion(sharding: ClusterSharding): ActorRef[ShardingEnvelope[HmdaRawDataCommand]] = diff --git a/hmda/src/test/scala/hmda/persistence/submission/HmdaRawDataSpec.scala b/hmda/src/test/scala/hmda/persistence/submission/HmdaRawDataSpec.scala index 66db0de19f..8958cbd3e9 100644 --- a/hmda/src/test/scala/hmda/persistence/submission/HmdaRawDataSpec.scala +++ b/hmda/src/test/scala/hmda/persistence/submission/HmdaRawDataSpec.scala @@ -1,7 +1,6 @@ package hmda.persistence.submission import java.time.Instant - import akka.actor import akka.actor.testkit.typed.scaladsl.TestProbe import hmda.persistence.AkkaCassandraPersistenceSpec @@ -9,7 +8,7 @@ import akka.actor.typed.scaladsl.adapter._ import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.typed.{ Cluster, Join } import hmda.messages.submission.HmdaRawDataCommands.AddLines -import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded } +import hmda.messages.submission.HmdaRawDataEvents.{ HmdaRawDataEvent, LineAdded, LinesAdded } import hmda.model.filing.submission.SubmissionId import hmda.utils.YearUtils.Period @@ -20,7 +19,7 @@ class HmdaRawDataSpec extends AkkaCassandraPersistenceSpec { val sharding = ClusterSharding(typedSystem) HmdaRawData.startShardRegion(sharding) - val hmdaRawProbe = TestProbe[Seq[HmdaRawDataEvent]] + val hmdaRawProbe = TestProbe[HmdaRawDataEvent] val submissionId = SubmissionId("12345", Period(2018, None), 1) @@ -33,8 +32,7 @@ class HmdaRawDataSpec extends AkkaCassandraPersistenceSpec { val timestamp = Instant.now.toEpochMilli hmdaRawData ! AddLines(submissionId, timestamp, List("data1", "data2"), Some(hmdaRawProbe.ref)) - - hmdaRawProbe.expectMessage(Seq(LineAdded(timestamp, "data1"), LineAdded(timestamp, "data2"))) + hmdaRawProbe.expectMessage(LinesAdded(timestamp, Seq("data1", "data2"))) } } diff --git a/hmda/src/test/scala/hmda/persistence/submission/HmdaValidationErrorSpec.scala b/hmda/src/test/scala/hmda/persistence/submission/HmdaValidationErrorSpec.scala index c473fdc5d3..c8d4f95a53 100644 --- a/hmda/src/test/scala/hmda/persistence/submission/HmdaValidationErrorSpec.scala +++ b/hmda/src/test/scala/hmda/persistence/submission/HmdaValidationErrorSpec.scala @@ -64,11 +64,11 @@ class HmdaValidationErrorSpec extends AkkaCassandraPersistenceSpec with ScalaFut StreamConverters .fromInputStream(() => getClass.getResourceAsStream("/error_test_files/trigger_s304_s305.txt")) .via(framing("\n")) - .mapAsync(1)(data => hmdaRawData ? ((ref: ActorRef[Seq[HmdaRawDataEvent]]) => + .mapAsync(1)(data => hmdaRawData ? ((ref: ActorRef[HmdaRawDataEvent]) => AddLines(submissionId, Instant.now.toEpochMilli, List(data.utf8String), Some(ref)))) .run() .futureValue - + import ValidationProgress._ // subscribe to progress updates; we expect an initial progress message, where no validation stage has yet started