Skip to content

Commit 4d59211

Browse files
committed
Ensure all parsing is finished before beginning validation
1 parent 6b7f903 commit 4d59211

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

persistence/src/main/scala/hmda/persistence/processing/HmdaFileParser.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ object HmdaFileParser {
2828

2929
case class ReadHmdaRawFile(persistenceId: String, replyTo: ActorRef) extends Command
3030
case class FinishParsing(replyTo: ActorRef) extends Command
31+
case class FinishParsingTS(replyTo: ActorRef) extends Command
32+
case class FinishParsingLARs(replyTo: ActorRef) extends Command
3133
case class GetStatePaginated(page: Int)
3234

3335
def props(id: SubmissionId): Props = Props(new HmdaFileParser(id))
@@ -68,6 +70,9 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
6870
stat
6971
}
7072

73+
var tsParsingDone: Boolean = false
74+
var larParsingDone: Boolean = false
75+
7176
override def updateState(event: Event): Unit = {
7277
state = state.updated(event)
7378
}
@@ -77,7 +82,6 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
7782
override def receiveCommand: Receive = {
7883

7984
case ReadHmdaRawFile(persistenceId, replyTo: ActorRef) =>
80-
8185
val parsedTs = events(persistenceId)
8286
.filter { x => x.isInstanceOf[LineAdded] }
8387
.map { case LineAdded(_, data) => data }
@@ -91,7 +95,7 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
9195
}
9296

9397
parsedTs
94-
.runForeach(pTs => self ! pTs)
98+
.runWith(Sink.actorRef(self, FinishParsingTS(replyTo)))
9599

96100
val parsedLar = events(persistenceId)
97101
.filter { x => x.isInstanceOf[LineAdded] }
@@ -112,7 +116,7 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
112116

113117
parsedLar
114118
.mapAsync(parallelism = flowParallelism)(x => (self ? x).mapTo[Persisted.type])
115-
.runWith(Sink.actorRef(self, FinishParsing(replyTo)))
119+
.runWith(Sink.actorRef(self, FinishParsingLARs(replyTo)))
116120

117121
case tp @ TsParsed(ts) =>
118122
persist(tp) { e =>
@@ -140,11 +144,16 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
140144
sender() ! Persisted
141145
}
142146

143-
case FinishParsing(replyTo) =>
144-
for {
145-
stat <- statRef
146-
} yield stat ! CountSubmittedLarsInSubmission
147+
case FinishParsingTS(replyTo) =>
148+
tsParsingDone = true
149+
if (larParsingDone) self ! FinishParsing(replyTo)
147150

151+
case FinishParsingLARs(replyTo) =>
152+
larParsingDone = true
153+
if (tsParsingDone) self ! FinishParsing(replyTo)
154+
155+
case FinishParsing(replyTo) =>
156+
statRef.map(_ ! CountSubmittedLarsInSubmission)
148157
if (encounteredParsingErrors)
149158
replyTo ! ParsingCompletedWithErrors(submissionId)
150159
else

0 commit comments

Comments
 (0)