Skip to content

Commit a5e63b3

Browse files
author
Nick Grippin
authored
Merge pull request cfpb#1207 from jmarin/2016-lar-loader
2016 lar loader
2 parents 9fc1dca + 32ccb02 commit a5e63b3

File tree

11 files changed

+227
-4
lines changed

11 files changed

+227
-4
lines changed

Documents/2016_Data_load.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# 2016 HMDA DATA LOAD
2+
3+
The 2016 LAR data is necessary for some macro edits that will make comparisons between the file submitted and the previous year's data.
4+
5+
In order to load the data, the 2017 panel should be loaded already. The LAR Loader will iterate through a file directory where the `HMDA` files reside,
6+
and send their contents to the `HMDA Platform`. The `HMDA` files must be in the pipe delimited format, with `.txt` extension and contain both Transmittal Sheet
7+
and Loan Application Register data, as per the [2017 Filing Instruction Guide](https://www.consumerfinance.gov/data-research/hmda/static/for-filers/2017/2017-HMDA-FIG.pdf)
8+
9+
10+
## Loading LAR data
11+
12+
By default, the loader will use `localhost` as the host where the HMDA Platform cluster is running. For remote systems, use the `HMDA_CLUSTER_HOST` environment variable.
13+
14+
When starting the cluster, take note of the port that Akka uses to communicate (i.e. 2551). This value needs to be set in an environment variable.
15+
The persistence cluster role needs to be active, and its dependencies (`Zookeeper`, `Cassandra`) running and properly configured.
16+
17+
```shell
18+
$ export HMDA_CLUSTER_PORT=2551
19+
$ sbt
20+
> project loader
21+
> run /tmp/sample_lar 2016
22+
```
23+
24+
This will connect to the running cluster and send the contents of each file in the directory to the appropriate Actor responsible for storing `LARs` in the 2016 period.
25+
26+
## Troubleshooting
27+
28+
* In cases where there is no connection, make sure that the `HmdaSupervisor` and `HmdaQuerySupervisor` are receiving messages from the cluster client. Both of these actors
29+
need to have the Cluster Receptionist enabled. For more information please consult the current Akka documentation on [Cluster Client](https://doc.akka.io/docs/akka/current/scala/cluster-client.html)
30+
31+
* When loading large number of files, it might be necessary to increase the `hmda.persistent-actor-timeout` in the `Persistence-model` project's configuration file (or by passing the relevant
32+
runtime flag in a deployed application).
33+

api/src/main/scala/hmda/api/tcp/admin/InstitutionAdminTcpApi.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
4646
.via(byte2StringFlow)
4747
.map(x => InstitutionParser(x))
4848
.mapAsync(parallelism = buffer)(i => createInstitution(i))
49-
.mapAsync(parallelism = buffer)(i => createFiling(i))
49+
.mapAsync(parallelism = buffer)(i => createFilings(i))
5050
.map(e => ByteString(e.toString))
5151

5252
override val tcp: Future[Tcp.ServerBinding] = Tcp().bindAndHandle(
@@ -67,13 +67,17 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
6767
} yield i
6868
}
6969

70-
private def createFiling(institution: Institution): Future[Filing] = {
70+
private def createFilings(institution: Institution): Future[Filing] = {
7171
val fFilingPersistence = (supervisor ? FindFilings(FilingPersistence.name, institution.id)).mapTo[ActorRef]
7272
for {
7373
actor <- fFilingPersistence
7474
f <- (actor ? CreateFiling(Filing(institution.activityYear.toString, institution.id)))
7575
.mapTo[Option[Filing]]
7676
.map(x => x.getOrElse(Filing()))
77+
78+
_ <- (actor ? CreateFiling(Filing((institution.activityYear - 1).toString, institution.id)))
79+
.mapTo[Option[Filing]]
80+
.map(x => x.getOrElse(Filing()))
7781
} yield f
7882
}
7983

api/src/test/resources/application.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ akka {
44
testKit.filter-leeway = 10s
55
http.parsing.max-content-length = 1G
66
http.server.default-host-header = "cfpb.gov"
7+
actor {
8+
provider = cluster
9+
}
10+
remote {
11+
netty.tcp {
12+
port = 0
13+
}
14+
}
715
}
816

917
hmda {

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ lazy val loader = (project in file("loader"))
147147
)
148148
).dependsOn(parserJVM % "compile->compile;test->test")
149149
.dependsOn(apiModel % "compile->compile;test->test")
150+
.dependsOn(query)
150151

151152
lazy val persistenceModel = (project in file("persistence-model"))
152153
.settings(hmdaBuildSettings:_*)

cluster/src/main/resources/application.conf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
akka {
22
loggers = ["akka.event.slf4j.Slf4jLogger"]
33
loglevel = "DEBUG"
4+
loglevel = ${?HMDA_LOGLEVEL}
45
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
56
http.parsing.max-content-length = 1G
67
http.server.default-host-header = "cfpb.gov"
@@ -39,7 +40,7 @@ akka {
3940

4041
}
4142

42-
extensions = ["de.heikoseeberger.constructr.ConstructrExtension"]
43+
extensions = ["de.heikoseeberger.constructr.ConstructrExtension", "akka.cluster.client.ClusterClientReceptionist"]
4344

4445
}
4546

@@ -64,6 +65,7 @@ clustering {
6465
name = "hmda"
6566
name = ${?HMDA_CLUSTER_NAME}
6667
ip = "127.0.0.1"
68+
ip = ${?HMDA_CLUSTER_IP}
6769
port = 0
6870
port = ${?APP_PORT}
6971
}

loader/src/main/resources/application.conf

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
akka {
2-
loglevel = "INFO"
2+
loglevel = "DEBUG"
33
loglevel = ${?HMDA_LOGLEVEL}
44
actor.warn-about-java-serializer-usage = off
5+
actor.provider = cluster
6+
7+
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
58
}
69

710
hmda {
11+
clusterName = "hmda"
12+
clusterName = ${?HMDA_CLUSTER_NAME}
813
adminUrl = "http://0.0.0.0:8081"
914
adminUrl = ${?HMDA_HTTP_ADMIN_URL}
15+
actorTimeout = 5
1016
httpTimeout = 5
1117
panel {
1218
tcp {
@@ -16,5 +22,12 @@ hmda {
1622
port = ${?HMDA_PANEL_LOADER_PORT}
1723
}
1824
}
25+
lar {
26+
host = "127.0.0.1"
27+
host = ${?HMDA_CLUSTER_HOST}
28+
port = 0
29+
port = ${?HMDA_CLUSTER_PORT}
30+
parallelism = 5
31+
}
1932
}
2033

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package hmda.loader.lar
2+
3+
import java.io.File
4+
import java.time.Instant
5+
6+
import akka.pattern.ask
7+
import akka.actor.{ ActorPath, ActorRef, ActorSystem }
8+
import akka.cluster.client.{ ClusterClient, ClusterClientSettings }
9+
import akka.stream.{ ActorMaterializer, IOResult }
10+
import akka.stream.scaladsl.{ FileIO, Sink, Source }
11+
import akka.util.{ ByteString, Timeout }
12+
import hmda.api.util.FlowUtils
13+
import hmda.model.fi.{ Created, Filing, Submission }
14+
import hmda.persistence.HmdaSupervisor.{ FindFilings, FindHmdaFiling, FindProcessingActor, FindSubmissions }
15+
import hmda.persistence.institutions.FilingPersistence.CreateFiling
16+
import hmda.persistence.institutions.SubmissionPersistence.CreateSubmission
17+
import hmda.persistence.institutions.{ FilingPersistence, SubmissionPersistence }
18+
import hmda.persistence.processing.HmdaRawFile.AddLine
19+
import hmda.persistence.processing.ProcessingMessages.{ CompleteUpload, Persisted, StartUpload }
20+
import hmda.persistence.processing.SubmissionManager
21+
import hmda.persistence.messages.CommonMessages._
22+
import hmda.persistence.processing.SubmissionManager.AddFileName
23+
import hmda.query.HmdaQuerySupervisor.FindHmdaFilingView
24+
import org.slf4j.LoggerFactory
25+
26+
import scala.concurrent.duration._
27+
import scala.concurrent.{ ExecutionContext, Future }
28+
import scala.util.{ Failure, Success }
29+
30+
object HmdaBatchLarLoader extends FlowUtils {
31+
32+
override implicit val system: ActorSystem = ActorSystem("hmda-cluster-client")
33+
override implicit val materializer: ActorMaterializer = ActorMaterializer()
34+
override implicit val ec: ExecutionContext = system.dispatcher
35+
36+
val hmdaClusterName = config.getString("hmda.clusterName")
37+
val hmdaClusterIP = config.getString("hmda.lar.host")
38+
val hmdaClusterPort = config.getInt("hmda.lar.port")
39+
val actorTimeout = config.getInt("hmda.actorTimeout")
40+
val flowParallelism = config.getInt("hmda.lar.parallelism")
41+
42+
implicit val timeout = Timeout(actorTimeout.seconds)
43+
44+
val log = LoggerFactory.getLogger("hmda-lar-loader")
45+
46+
val initialContacts = Set(
47+
ActorPath.fromString(s"akka.tcp://$hmdaClusterName@$hmdaClusterIP:$hmdaClusterPort/system/receptionist")
48+
)
49+
50+
val settings = ClusterClientSettings(system)
51+
.withInitialContacts(initialContacts)
52+
53+
val clusterClient = system.actorOf(ClusterClient.props(settings), "hmda-lar-loader")
54+
55+
def main(args: Array[String]): Unit = {
56+
57+
if (args.length < 2) {
58+
exitSys(log, "Please provide a directory containing files with LAR data and period to process", 1)
59+
}
60+
61+
val path = new File(args(0))
62+
if (!path.isDirectory) {
63+
exitSys(log, "Argument must be the full path to a directory containing files with LAR data", 2)
64+
}
65+
66+
val period = args(1)
67+
68+
val fileList = path.listFiles().toSet
69+
val fileNames = fileList.map(file => file.getName).filter(name => name.endsWith(".txt"))
70+
71+
fileNames.foreach(fileName => processLars(new File(s"$path/$fileName"), s"$fileName", s"${fileName.substring(0, fileName.indexOf("."))}", period))
72+
73+
}
74+
75+
private def processLars(file: File, fileName: String, institutionId: String, period: String) = {
76+
val uploadTimestamp = Instant.now.toEpochMilli
77+
val source = FileIO.fromPath(file.toPath)
78+
79+
val fSubmissionsActor = (clusterClient ? ClusterClient
80+
.Send(
81+
"/user/supervisor/singleton",
82+
FindSubmissions(SubmissionPersistence.name, institutionId, period),
83+
localAffinity = true
84+
)).mapTo[ActorRef]
85+
86+
(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
87+
(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
88+
89+
val fUploadSubmission = for {
90+
s <- fSubmissionsActor
91+
fSubmission <- (s ? CreateSubmission).mapTo[Option[Submission]]
92+
submission = fSubmission.getOrElse(Submission())
93+
} yield (submission, submission.status == Created)
94+
95+
fUploadSubmission.onComplete {
96+
case Success((submission, true)) =>
97+
val findSubmissionManager = FindProcessingActor(SubmissionManager.name, submission.id)
98+
val findFilingPersistence = FindFilings(FilingPersistence.name, institutionId)
99+
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", findSubmissionManager, localAffinity = true)).mapTo[ActorRef]
100+
val fFilingPersistence = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", findFilingPersistence, localAffinity = true)).mapTo[ActorRef]
101+
102+
for {
103+
f <- fFilingPersistence
104+
p <- fProcessingActor
105+
_ <- (f ? CreateFiling(Filing(period, institutionId))).mapTo[Option[Filing]]
106+
} yield {
107+
uploadData(p, uploadTimestamp, fileName, submission, source)
108+
}
109+
110+
case Success((_, false)) =>
111+
log.error("submission not available for upload")
112+
sys.exit(0)
113+
114+
case Failure(error) =>
115+
log.error(error.getLocalizedMessage)
116+
sys.exit(0)
117+
118+
}
119+
}
120+
121+
private def uploadData(processingActor: ActorRef, uploadTimestamp: Long, fileName: String, submission: Submission, source: Source[ByteString, Future[IOResult]]): Unit = {
122+
processingActor ! AddFileName(fileName)
123+
processingActor ! StartUpload
124+
val uploadedF = source
125+
.via(framing)
126+
.map(_.utf8String)
127+
.mapAsync(parallelism = flowParallelism)(line => (processingActor ? AddLine(uploadTimestamp, line)).mapTo[Persisted.type])
128+
.runWith(Sink.ignore)
129+
130+
uploadedF.onComplete {
131+
case Success(_) =>
132+
processingActor ! CompleteUpload
133+
134+
case Failure(error) =>
135+
processingActor ! Shutdown
136+
log.error(error.getLocalizedMessage)
137+
}
138+
}
139+
140+
}

persistence/src/main/scala/hmda/persistence/HmdaSupervisor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package hmda.persistence
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.cluster.client.ClusterClientReceptionist
45
import hmda.model.fi.SubmissionId
56
import hmda.persistence.institutions.{ FilingPersistence, InstitutionPersistence, SubmissionPersistence }
67
import hmda.persistence.model.HmdaSupervisorActor
@@ -25,6 +26,8 @@ class HmdaSupervisor(validationStats: ActorRef) extends HmdaSupervisorActor {
2526

2627
import HmdaSupervisor._
2728

29+
ClusterClientReceptionist(context.system).registerService(self)
30+
2831
override def receive: Receive = super.receive orElse {
2932

3033
case FindHmdaFiling(filingPeriod) =>

persistence/src/test/resources/application.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ akka {
44
log-dead-letters = off
55
log-dead-letters-during-shutdown = off
66
actor.warn-about-java-serializer-usage = off
7+
8+
actor {
9+
provider = cluster
10+
}
11+
remote {
12+
netty.tcp {
13+
port = 0
14+
}
15+
}
716
}
817

918
akka.persistence.journal.plugin = "inmemory-journal"

query/src/main/scala/hmda/query/HmdaQuerySupervisor.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package hmda.query
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.cluster.client.ClusterClientReceptionist
45
import akka.util.Timeout
56
import hmda.persistence.model.HmdaSupervisorActor
67
import hmda.query.view.filing.HmdaFilingView
78
import hmda.query.view.institutions.InstitutionView
89
import hmda.persistence.PersistenceConfig._
910
import hmda.persistence.messages.CommonMessages._
11+
1012
import scala.concurrent.duration._
1113

1214
object HmdaQuerySupervisor {
@@ -28,6 +30,8 @@ class HmdaQuerySupervisor extends HmdaSupervisorActor {
2830
implicit val timeout = Timeout(duration.seconds)
2931
implicit val ec = context.dispatcher
3032

33+
ClusterClientReceptionist(context.system).registerService(self)
34+
3135
override def receive: Receive = super.receive orElse {
3236
case FindHmdaFilingView(period) =>
3337
sender() ! findHmdaFilingView(period)

query/src/test/resources/application.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ akka {
88
persistence.snapshot-store.plugin = "inmemory-snapshot-store"
99
actor {
1010
timeout = 5
11+
provider = cluster
12+
}
13+
remote {
14+
netty.tcp {
15+
port = 0
16+
}
1117
}
1218
}
1319

0 commit comments

Comments
 (0)