Skip to content

Commit a8e2939

Browse files
committed
Add file source
1 parent e3c0e90 commit a8e2939

File tree

1 file changed

+73
-22
lines changed

1 file changed

+73
-22
lines changed
Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,111 @@
11
package hmda.loader.lar
22

3+
import java.io.File
4+
35
import akka.actor.{ActorPath, ActorRef, ActorSystem}
46
import akka.pattern.ask
57
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
8+
import akka.stream.ActorMaterializer
9+
import akka.stream.scaladsl.{FileIO, Sink}
610
import akka.util.Timeout
711
import com.typesafe.config.ConfigFactory
8-
import hmda.model.fi.SubmissionId
9-
import hmda.persistence.HmdaSupervisor.{FindHmdaFiling, FindProcessingActor, FindSubmissions}
12+
import hmda.api.util.FlowUtils
13+
import hmda.model.fi.{Created, Submission, SubmissionId}
14+
import hmda.persistence.HmdaSupervisor.{FindProcessingActor, FindSubmissions}
1015
import hmda.persistence.institutions.SubmissionPersistence
16+
import hmda.persistence.institutions.SubmissionPersistence.GetSubmissionById
1117
import hmda.persistence.processing.SubmissionManager
12-
import scala.concurrent.duration._
13-
14-
object HmdaLarLoader extends App {
18+
import org.slf4j.LoggerFactory
1519

16-
val config = ConfigFactory.load()
20+
import scala.concurrent.duration._
21+
import scala.util.{Failure, Success}
1722

18-
implicit val system = ActorSystem("hmda-cluster-client")
19-
implicit val ec = system.dispatcher
23+
object HmdaLarLoader extends FlowUtils {
2024

25+
override implicit val system = ActorSystem("hmda-cluster-client")
26+
override implicit val materializer = ActorMaterializer()
27+
override implicit val ec = system.dispatcher
2128
val hmdaClusterName = config.getString("hmda.clusterName")
2229
val hmdaClusterIP = config.getString("hmda.lar.host")
2330
val hmdaClusterPort = config.getInt("hmda.lar.port")
2431
val actorTimeout = config.getInt("hmda.actorTimeout")
2532

2633
implicit val timeout = Timeout(actorTimeout.seconds)
2734

35+
val log = LoggerFactory.getLogger("hmda-lar-loader")
36+
2837
val initialContacts = Set(
2938
ActorPath.fromString(s"akka.tcp://$hmdaClusterName@$hmdaClusterIP:$hmdaClusterPort/system/receptionist")
3039
)
3140

32-
println(initialContacts)
33-
3441
val settings = ClusterClientSettings(system)
3542
.withInitialContacts(initialContacts)
3643

3744
val clusterClient = system.actorOf(ClusterClient.props(settings), "hmda-lar-loader")
3845

39-
val institutionId = "institutionID"
40-
val period = "2017"
46+
def main(args: Array[String]): Unit = {
47+
48+
if (args.length < 1) {
49+
exitSys(log, "No File argument provided", 1)
50+
}
51+
52+
val file = new File(args(0))
53+
if (!file.exists() || !file.isFile) {
54+
exitSys(log, "File does not exist", 2)
55+
}
56+
57+
val source = FileIO.fromPath(file.toPath)
58+
59+
source.take(1)
60+
.runWith(Sink.foreach(println))
61+
62+
63+
64+
65+
66+
val institutionId = "institutionID"
67+
val period = "2017"
68+
69+
//processLars(institutionId, period)
70+
71+
}
72+
73+
private def processLars(institutionId: String, period: String) = {
74+
val submissionId = SubmissionId(institutionId, period)
75+
76+
val message = FindProcessingActor(SubmissionManager.name, submissionId)
4177

42-
val submissionId = SubmissionId(institutionId, period)
78+
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
79+
val fSubmissionsActor = (clusterClient ? ClusterClient
80+
.Send("/user/supervisor/singleton",
81+
FindSubmissions(SubmissionPersistence.name, institutionId, period),
82+
localAffinity = true)).mapTo[ActorRef]
4383

44-
val message = FindProcessingActor(SubmissionManager.name, submissionId)
84+
//TODO: Do we need this to load previous year's data?
85+
//(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
86+
//(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
4587

46-
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
47-
val fSubmissionsActor = (clusterClient ? ClusterClient
48-
.Send("/user/supervisor/singleton",
49-
FindSubmissions(SubmissionPersistence.name, institutionId, period),
50-
localAffinity = true)).mapTo[ActorRef]
88+
val fUploadSubmission = for {
89+
p <- fProcessingActor
90+
s <- fSubmissionsActor
91+
fSubmission <- (s ? GetSubmissionById(submissionId)).mapTo[Submission]
92+
} yield (fSubmission, fSubmission.status == Created, p)
5193

52-
(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
53-
//(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
94+
fUploadSubmission.onComplete {
95+
case Success((submission, true, processingActor)) =>
96+
uploadData(processingActor, 0L, submission)
5497

98+
case Success((_, false, _)) =>
99+
log.error("submission not available for upload")
100+
sys.exit(0)
55101

102+
case Failure(error) =>
103+
log.error(error.getLocalizedMessage)
104+
sys.exit(0)
56105

106+
}
107+
}
57108

58-
private def uploadFile(): Unit = ???
109+
private def uploadData(processingActor: ActorRef, uploadTimestamp: Long, submission:Submission ): Unit = ???
59110

60111
}

0 commit comments

Comments
 (0)