Skip to content

Commit 5caadc0

Browse files
committed
Add cluster name and filing creation
1 parent 7434948 commit 5caadc0

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

loader/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ akka {
99

1010
hmda {
1111
clusterName = "hmda"
12+
clusterName = ${?HMDA_CLUSTER_NAME}
1213
adminUrl = "http://0.0.0.0:8081"
1314
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
1415
actorTimeout = 5

loader/src/main/scala/hmda/loader/lar/HmdaLarLoader.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import akka.stream.{ ActorMaterializer, IOResult }
1010
import akka.stream.scaladsl.{ FileIO, Sink, Source }
1111
import akka.util.{ ByteString, Timeout }
1212
import hmda.api.util.FlowUtils
13-
import hmda.model.fi.{ Created, Submission }
14-
import hmda.persistence.HmdaSupervisor.{ FindHmdaFiling, FindProcessingActor, FindSubmissions }
15-
import hmda.persistence.institutions.SubmissionPersistence
13+
import hmda.model.fi.{ Created, Filing, Submission }
14+
import hmda.persistence.HmdaSupervisor.{ FindFilings, FindHmdaFiling, FindProcessingActor, FindSubmissions }
15+
import hmda.persistence.institutions.FilingPersistence.CreateFiling
16+
import hmda.persistence.institutions.{ FilingPersistence, SubmissionPersistence }
1617
import hmda.persistence.institutions.SubmissionPersistence.CreateSubmission
1718
import hmda.persistence.processing.HmdaRawFile.AddLine
1819
import hmda.persistence.processing.ProcessingMessages.{ CompleteUpload, Persisted, StartUpload }
@@ -92,10 +93,17 @@ object HmdaLarLoader extends FlowUtils {
9293

9394
fUploadSubmission.onComplete {
9495
case Success((submission, true)) =>
95-
val message = FindProcessingActor(SubmissionManager.name, submission.id)
96-
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
97-
fProcessingActor.onComplete { processingActor =>
98-
uploadData(processingActor.getOrElse(ActorRef.noSender), uploadTimestamp, fileName, submission, source)
96+
val findSubmissionManager = FindProcessingActor(SubmissionManager.name, submission.id)
97+
val findFilingPersistence = FindFilings(FilingPersistence.name, institutionId)
98+
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", findSubmissionManager, localAffinity = true)).mapTo[ActorRef]
99+
val fFilingPersistence = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", findFilingPersistence, localAffinity = true)).mapTo[ActorRef]
100+
101+
for {
102+
f <- fFilingPersistence
103+
p <- fProcessingActor
104+
_ <- (f ? CreateFiling(Filing(period, institutionId))).mapTo[Option[Filing]]
105+
} yield {
106+
uploadData(p, uploadTimestamp, fileName, submission, source)
99107
}
100108

101109
case Success((_, false)) =>

0 commit comments

Comments
 (0)