Skip to content

Commit 7434948

Browse files
committed
Add upload data
1 parent a8e2939 commit 7434948

File tree

4 files changed

+69
-43
lines changed

4 files changed

+69
-43
lines changed

api/src/main/scala/hmda/api/tcp/admin/InstitutionAdminTcpApi.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
4646
.via(byte2StringFlow)
4747
.map(x => InstitutionParser(x))
4848
.mapAsync(parallelism = buffer)(i => createInstitution(i))
49-
.mapAsync(parallelism = buffer)(i => createFiling(i))
49+
.mapAsync(parallelism = buffer)(i => createFilings(i))
5050
.map(e => ByteString(e.toString))
5151

5252
override val tcp: Future[Tcp.ServerBinding] = Tcp().bindAndHandle(
@@ -67,13 +67,17 @@ class InstitutionAdminTcpApi(supervisor: ActorRef) extends TcpApi with FlowUtils
6767
} yield i
6868
}
6969

70-
private def createFiling(institution: Institution): Future[Filing] = {
70+
private def createFilings(institution: Institution): Future[Filing] = {
7171
val fFilingPersistence = (supervisor ? FindFilings(FilingPersistence.name, institution.id)).mapTo[ActorRef]
7272
for {
7373
actor <- fFilingPersistence
7474
f <- (actor ? CreateFiling(Filing(institution.activityYear.toString, institution.id)))
7575
.mapTo[Option[Filing]]
7676
.map(x => x.getOrElse(Filing()))
77+
78+
_ <- (actor ? CreateFiling(Filing((institution.activityYear - 1).toString, institution.id)))
79+
.mapTo[Option[Filing]]
80+
.map(x => x.getOrElse(Filing()))
7781
} yield f
7882
}
7983

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ lazy val loader = (project in file("loader"))
145145
)
146146
).dependsOn(parserJVM % "compile->compile;test->test")
147147
.dependsOn(apiModel % "compile->compile;test->test")
148+
.dependsOn(query)
148149

149150
lazy val persistenceModel = (project in file("persistence-model"))
150151
.settings(hmdaBuildSettings:_*)

loader/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hmda {
2626
host = ${?HMDA_CLUSTER_HOST}
2727
port = 0
2828
port = ${?HMDA_CLUSTER_PORT}
29+
parallelism = 5
2930
}
3031
}
3132

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
11
package hmda.loader.lar
22

33
import java.io.File
4+
import java.time.Instant
45

5-
import akka.actor.{ActorPath, ActorRef, ActorSystem}
6+
import akka.actor.{ ActorPath, ActorRef, ActorSystem }
67
import akka.pattern.ask
7-
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
8-
import akka.stream.ActorMaterializer
9-
import akka.stream.scaladsl.{FileIO, Sink}
10-
import akka.util.Timeout
11-
import com.typesafe.config.ConfigFactory
8+
import akka.cluster.client.{ ClusterClient, ClusterClientSettings }
9+
import akka.stream.{ ActorMaterializer, IOResult }
10+
import akka.stream.scaladsl.{ FileIO, Sink, Source }
11+
import akka.util.{ ByteString, Timeout }
1212
import hmda.api.util.FlowUtils
13-
import hmda.model.fi.{Created, Submission, SubmissionId}
14-
import hmda.persistence.HmdaSupervisor.{FindProcessingActor, FindSubmissions}
13+
import hmda.model.fi.{ Created, Submission }
14+
import hmda.persistence.HmdaSupervisor.{ FindHmdaFiling, FindProcessingActor, FindSubmissions }
1515
import hmda.persistence.institutions.SubmissionPersistence
16-
import hmda.persistence.institutions.SubmissionPersistence.GetSubmissionById
16+
import hmda.persistence.institutions.SubmissionPersistence.CreateSubmission
17+
import hmda.persistence.processing.HmdaRawFile.AddLine
18+
import hmda.persistence.processing.ProcessingMessages.{ CompleteUpload, Persisted, StartUpload }
1719
import hmda.persistence.processing.SubmissionManager
20+
import hmda.persistence.messages.CommonMessages._
21+
import hmda.persistence.processing.SubmissionManager.AddFileName
22+
import hmda.query.HmdaQuerySupervisor.FindHmdaFilingView
1823
import org.slf4j.LoggerFactory
1924

25+
import scala.concurrent.{ Await, Future }
2026
import scala.concurrent.duration._
21-
import scala.util.{Failure, Success}
27+
import scala.util.{ Failure, Success }
2228

2329
object HmdaLarLoader extends FlowUtils {
2430

@@ -29,6 +35,7 @@ object HmdaLarLoader extends FlowUtils {
2935
val hmdaClusterIP = config.getString("hmda.lar.host")
3036
val hmdaClusterPort = config.getInt("hmda.lar.port")
3137
val actorTimeout = config.getInt("hmda.actorTimeout")
38+
val flowParallelism = config.getInt("hmda.lar.parallelism")
3239

3340
implicit val timeout = Timeout(actorTimeout.seconds)
3441

@@ -54,48 +61,44 @@ object HmdaLarLoader extends FlowUtils {
5461
exitSys(log, "File does not exist", 2)
5562
}
5663

57-
val source = FileIO.fromPath(file.toPath)
58-
59-
source.take(1)
60-
.runWith(Sink.foreach(println))
61-
62-
63-
64-
65-
66-
val institutionId = "institutionID"
67-
val period = "2017"
68-
69-
//processLars(institutionId, period)
64+
val fileName = file.getName
65+
val parts = fileName.split("_")
66+
val institutionId = parts.head
67+
val finalPart = parts.tail.head
68+
val period = finalPart.substring(0, finalPart.indexOf("."))
69+
processLars(file, fileName, institutionId, period)
7070

7171
}
7272

73-
private def processLars(institutionId: String, period: String) = {
74-
val submissionId = SubmissionId(institutionId, period)
75-
76-
val message = FindProcessingActor(SubmissionManager.name, submissionId)
73+
private def processLars(file: File, fileName: String, institutionId: String, period: String) = {
74+
val uploadTimestamp = Instant.now.toEpochMilli
75+
val source = FileIO.fromPath(file.toPath)
7776

78-
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
7977
val fSubmissionsActor = (clusterClient ? ClusterClient
80-
.Send("/user/supervisor/singleton",
78+
.Send(
79+
"/user/supervisor/singleton",
8180
FindSubmissions(SubmissionPersistence.name, institutionId, period),
82-
localAffinity = true)).mapTo[ActorRef]
81+
localAffinity = true
82+
)).mapTo[ActorRef]
8383

84-
//TODO: Do we need this to load previous year's data?
85-
//(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
86-
//(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
84+
(clusterClient ? ClusterClient.Send("/user/supervisor/singleton", FindHmdaFiling(period), localAffinity = true)).mapTo[ActorRef]
85+
(clusterClient ? ClusterClient.Send("/user/query-supervisor", FindHmdaFilingView(period), localAffinity = true)).mapTo[ActorRef]
8786

8887
val fUploadSubmission = for {
89-
p <- fProcessingActor
9088
s <- fSubmissionsActor
91-
fSubmission <- (s ? GetSubmissionById(submissionId)).mapTo[Submission]
92-
} yield (fSubmission, fSubmission.status == Created, p)
89+
fSubmission <- (s ? CreateSubmission).mapTo[Option[Submission]]
90+
submission = fSubmission.getOrElse(Submission())
91+
} yield (submission, submission.status == Created)
9392

9493
fUploadSubmission.onComplete {
95-
case Success((submission, true, processingActor)) =>
96-
uploadData(processingActor, 0L, submission)
97-
98-
case Success((_, false, _)) =>
94+
case Success((submission, true)) =>
95+
val message = FindProcessingActor(SubmissionManager.name, submission.id)
96+
val fProcessingActor = (clusterClient ? ClusterClient.Send("/user/supervisor/singleton", message, localAffinity = true)).mapTo[ActorRef]
97+
fProcessingActor.onComplete { processingActor =>
98+
uploadData(processingActor.getOrElse(ActorRef.noSender), uploadTimestamp, fileName, submission, source)
99+
}
100+
101+
case Success((_, false)) =>
99102
log.error("submission not available for upload")
100103
sys.exit(0)
101104

@@ -106,6 +109,23 @@ object HmdaLarLoader extends FlowUtils {
106109
}
107110
}
108111

109-
private def uploadData(processingActor: ActorRef, uploadTimestamp: Long, submission:Submission ): Unit = ???
112+
private def uploadData(processingActor: ActorRef, uploadTimestamp: Long, fileName: String, submission: Submission, source: Source[ByteString, Future[IOResult]]): Unit = {
113+
processingActor ! AddFileName(fileName)
114+
processingActor ! StartUpload
115+
val uploadedF = source
116+
.via(framing)
117+
.map(_.utf8String)
118+
.mapAsync(parallelism = flowParallelism)(line => (processingActor ? AddLine(uploadTimestamp, line)).mapTo[Persisted.type])
119+
.runWith(Sink.ignore)
120+
121+
uploadedF.onComplete {
122+
case Success(_) =>
123+
processingActor ! CompleteUpload
124+
125+
case Failure(error) =>
126+
processingActor ! Shutdown
127+
log.error(error.getLocalizedMessage)
128+
}
129+
}
110130

111131
}

0 commit comments

Comments
 (0)