Skip to content

Commit 77bae5a

Browse files
authored
Merge pull request cfpb#1131 from schbetsy/parsing-messages
Fix SubmissionSummary Intermittent Spec Failure
2 parents 7d91fb4 + c1f8974 commit 77bae5a

File tree

3 files changed

+38
-41
lines changed

3 files changed

+38
-41
lines changed

model/shared/src/test/scala/hmda/model/institution/SubmissionGenerators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ object SubmissionGenerators {
3434
status <- submissionStatusGen
3535
start <- Gen.choose(1483287071000L, 1514736671000L)
3636
end <- Gen.choose(1483287071000L, 1514736671000L)
37+
fileName <- Gen.alphaStr
3738
receipt <- Gen.alphaStr
38-
} yield Submission(id, status, start, end, receipt)
39+
} yield Submission(id, status, start, end, fileName, receipt)
3940
}
4041
}

persistence/src/main/scala/hmda/persistence/processing/HmdaFileParser.scala

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ object HmdaFileParser {
2828

2929
case class ReadHmdaRawFile(persistenceId: String, replyTo: ActorRef) extends Command
3030
case class FinishParsing(replyTo: ActorRef) extends Command
31+
case class FinishParsingTS(replyTo: ActorRef) extends Command
32+
case class FinishParsingLARs(replyTo: ActorRef) extends Command
3133
case class GetStatePaginated(page: Int)
3234

3335
def props(id: SubmissionId): Props = Props(new HmdaFileParser(id))
@@ -62,11 +64,10 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
6264
var state = HmdaFileParseState()
6365
var encounteredParsingErrors: Boolean = false
6466
val manager = context.parent
65-
val statRef = for {
66-
stat <- (manager ? GetActorRef(SubmissionLarStats.name)).mapTo[ActorRef]
67-
} yield {
68-
stat
69-
}
67+
val statRef = (manager ? GetActorRef(SubmissionLarStats.name)).mapTo[ActorRef]
68+
69+
var tsParsingDone: Boolean = false
70+
var larParsingDone: Boolean = false
7071

7172
override def updateState(event: Event): Unit = {
7273
state = state.updated(event)
@@ -77,7 +78,6 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
7778
override def receiveCommand: Receive = {
7879

7980
case ReadHmdaRawFile(persistenceId, replyTo: ActorRef) =>
80-
8181
val parsedTs = events(persistenceId)
8282
.filter { x => x.isInstanceOf[LineAdded] }
8383
.map { case LineAdded(_, data) => data }
@@ -91,7 +91,7 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
9191
}
9292

9393
parsedTs
94-
.runForeach(pTs => self ! pTs)
94+
.runWith(Sink.actorRef(self, FinishParsingTS(replyTo)))
9595

9696
val parsedLar = events(persistenceId)
9797
.filter { x => x.isInstanceOf[LineAdded] }
@@ -100,7 +100,7 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
100100
.zip(Source.fromIterator(() => Iterator.from(2)))
101101
.map {
102102
case (lar, index) =>
103-
sendLar(lar)
103+
statRef.map(_ ! lar)
104104
LarCsvParser(lar, index)
105105
}
106106
.map {
@@ -112,7 +112,7 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
112112

113113
parsedLar
114114
.mapAsync(parallelism = flowParallelism)(x => (self ? x).mapTo[Persisted.type])
115-
.runWith(Sink.actorRef(self, FinishParsing(replyTo)))
115+
.runWith(Sink.actorRef(self, FinishParsingLARs(replyTo)))
116116

117117
case tp @ TsParsed(ts) =>
118118
persist(tp) { e =>
@@ -140,11 +140,16 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
140140
sender() ! Persisted
141141
}
142142

143-
case FinishParsing(replyTo) =>
144-
for {
145-
stat <- statRef
146-
} yield stat ! CountSubmittedLarsInSubmission
143+
case FinishParsingTS(replyTo) =>
144+
tsParsingDone = true
145+
if (larParsingDone) self ! FinishParsing(replyTo)
147146

147+
case FinishParsingLARs(replyTo) =>
148+
larParsingDone = true
149+
if (tsParsingDone) self ! FinishParsing(replyTo)
150+
151+
case FinishParsing(replyTo) =>
152+
statRef.map(_ ! CountSubmittedLarsInSubmission)
148153
if (encounteredParsingErrors)
149154
replyTo ! ParsingCompletedWithErrors(submissionId)
150155
else
@@ -164,11 +169,5 @@ class HmdaFileParser(submissionId: SubmissionId) extends HmdaPersistentActor {
164169
context stop self
165170

166171
}
167-
168-
private def sendLar(s: String) {
169-
for {
170-
stat <- statRef
171-
} yield stat ! s
172-
}
173172
}
174173

persistence/src/main/scala/hmda/persistence/processing/HmdaFileValidator.scala

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,45 +139,42 @@ class HmdaFileValidator(supervisor: ActorRef, validationStats: ActorRef, submiss
139139
override def receiveCommand: Receive = {
140140

141141
case BeginValidation(replyTo) =>
142-
val validationStarted = ValidationStarted(submissionId)
143-
sender() ! validationStarted
142+
sender() ! ValidationStarted(submissionId)
144143
events(parserPersistenceId)
145144
.filter(x => x.isInstanceOf[TsParsed])
146-
.map(e => e.asInstanceOf[TsParsed].ts)
147-
.map(ts => (ts, validateTs(ts, ctx).toEither))
145+
.map { e => e.asInstanceOf[TsParsed].ts }
146+
.map { ts =>
147+
self ! ts
148+
validationStats ! AddSubmissionTaxId(ts.taxId, submissionId)
149+
self ! ValidateAggregate(ts)
150+
validateTs(ts, ctx).toEither
151+
}
148152
.map {
149-
case (_, Right(ts)) =>
150-
validationStats ! AddSubmissionTaxId(ts.taxId, submissionId)
151-
ValidateAggregate(ts)
152-
case (ts, Left(errors)) =>
153-
validationStats ! AddSubmissionTaxId(ts.taxId, submissionId)
154-
self ! ValidateAggregate(ts)
155-
TsValidationErrors(errors.list.toList)
153+
case Right(_) => // do nothing
154+
case Left(errors) => TsValidationErrors(errors.list.toList)
156155
}
157156
.runWith(Sink.actorRef(self, NotUsed))
158157

159158
val larSource: Source[LoanApplicationRegister, NotUsed] = events(parserPersistenceId)
160159
.filter(x => x.isInstanceOf[LarParsed])
161160
.map(e => e.asInstanceOf[LarParsed].lar)
162161

163-
larSource.map(lar => (lar, validateLar(lar, ctx).toEither))
162+
larSource.map { lar =>
163+
self ! lar
164+
validateLar(lar, ctx).toEither
165+
}
164166
.map {
165-
case (_, Right(l)) => l
166-
case (lar, Left(errors)) => {
167-
self ! lar
168-
LarValidationErrors(errors.list.toList)
169-
}
167+
case Right(_) => // do nothing
168+
case Left(errors) => LarValidationErrors(errors.list.toList)
170169
}
171170
.runWith(Sink.actorRef(self, ValidateMacro(larSource, replyTo)))
172171

173172
case ValidateAggregate(ts) =>
174173
performAsyncChecks(ts, ctx)
175174
.map(validations => validations.toEither)
176175
.map {
177-
case Right(_) => self ! ts
178-
case Left(errors) =>
179-
self ! TsValidationErrors(errors.list.toList)
180-
self ! ts
176+
case Right(_) => // do nothing
177+
case Left(errors) => self ! TsValidationErrors(errors.list.toList)
181178
}
182179

183180
case ts: TransmittalSheet =>

0 commit comments

Comments
 (0)