Skip to content

Commit b45d047

Browse files
committed
Add batch lar loader
1 parent 5caadc0 commit b45d047

File tree

2 files changed

+20
-18
lines changed

2 files changed

+20
-18
lines changed

cluster/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
akka {
22
loggers = ["akka.event.slf4j.Slf4jLogger"]
33
loglevel = "DEBUG"
4+
loglevel = ${?HMDA_LOGLEVEL}
45
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
56
http.parsing.max-content-length = 1G
67
http.server.default-host-header = "cfpb.gov"

loader/src/main/scala/hmda/loader/lar/HmdaLarLoader.scala renamed to loader/src/main/scala/hmda/loader/lar/HmdaBatchLarLoader.scala

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package hmda.loader.lar
33
import java.io.File
44
import java.time.Instant
55

6-
import akka.actor.{ ActorPath, ActorRef, ActorSystem }
76
import akka.pattern.ask
7+
import akka.actor.{ ActorPath, ActorRef, ActorSystem }
88
import akka.cluster.client.{ ClusterClient, ClusterClientSettings }
99
import akka.stream.{ ActorMaterializer, IOResult }
1010
import akka.stream.scaladsl.{ FileIO, Sink, Source }
@@ -13,8 +13,8 @@ import hmda.api.util.FlowUtils
1313
import hmda.model.fi.{ Created, Filing, Submission }
1414
import hmda.persistence.HmdaSupervisor.{ FindFilings, FindHmdaFiling, FindProcessingActor, FindSubmissions }
1515
import hmda.persistence.institutions.FilingPersistence.CreateFiling
16-
import hmda.persistence.institutions.{ FilingPersistence, SubmissionPersistence }
1716
import hmda.persistence.institutions.SubmissionPersistence.CreateSubmission
17+
import hmda.persistence.institutions.{ FilingPersistence, SubmissionPersistence }
1818
import hmda.persistence.processing.HmdaRawFile.AddLine
1919
import hmda.persistence.processing.ProcessingMessages.{ CompleteUpload, Persisted, StartUpload }
2020
import hmda.persistence.processing.SubmissionManager
@@ -23,15 +23,16 @@ import hmda.persistence.processing.SubmissionManager.AddFileName
2323
import hmda.query.HmdaQuerySupervisor.FindHmdaFilingView
2424
import org.slf4j.LoggerFactory
2525

26-
import scala.concurrent.{ Await, Future }
2726
import scala.concurrent.duration._
27+
import scala.concurrent.{ ExecutionContext, Future }
2828
import scala.util.{ Failure, Success }
2929

30-
object HmdaLarLoader extends FlowUtils {
30+
object HmdaBatchLarLoader extends FlowUtils {
31+
32+
override implicit val system: ActorSystem = ActorSystem("hmda-cluster-client")
33+
override implicit val materializer: ActorMaterializer = ActorMaterializer()
34+
override implicit val ec: ExecutionContext = system.dispatcher
3135

32-
override implicit val system = ActorSystem("hmda-cluster-client")
33-
override implicit val materializer = ActorMaterializer()
34-
override implicit val ec = system.dispatcher
3536
val hmdaClusterName = config.getString("hmda.clusterName")
3637
val hmdaClusterIP = config.getString("hmda.lar.host")
3738
val hmdaClusterPort = config.getInt("hmda.lar.port")
@@ -53,21 +54,21 @@ object HmdaLarLoader extends FlowUtils {
5354

5455
def main(args: Array[String]): Unit = {
5556

56-
if (args.length < 1) {
57-
exitSys(log, "No File argument provided", 1)
57+
if (args.length < 2) {
58+
exitSys(log, "Please provide a directory containing files with LAR data and period to process", 1)
5859
}
5960

60-
val file = new File(args(0))
61-
if (!file.exists() || !file.isFile) {
62-
exitSys(log, "File does not exist", 2)
61+
val path = new File(args(0))
62+
if (!path.isDirectory) {
63+
exitSys(log, "Argument must be the full path to a directory containing files with LAR data", 2)
6364
}
6465

65-
val fileName = file.getName
66-
val parts = fileName.split("_")
67-
val institutionId = parts.head
68-
val finalPart = parts.tail.head
69-
val period = finalPart.substring(0, finalPart.indexOf("."))
70-
processLars(file, fileName, institutionId, period)
66+
val period = args(1)
67+
68+
val fileList = path.listFiles().toSet
69+
val fileNames = fileList.map(file => file.getName).filter(name => name.endsWith(".txt"))
70+
71+
fileNames.foreach(fileName => processLars(new File(s"$path/$fileName"), s"$fileName", s"${fileName.substring(0, fileName.indexOf("."))}", period))
7172

7273
}
7374

0 commit comments

Comments
 (0)